KAFKA-15585: Implement DescribeTopicPartitions RPC on broker (#14612)
This patch implements the new DescribeTopicPartitions RPC as defined in KIP-966 (ELR). Additionally, this patch adds a broker config "max.request.partition.size.limit" which limits the number of partitions returned by the new RPC.
Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dae818c..7df0775 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -44,6 +44,9 @@
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
+ <!-- server tests -->
+ <suppress checks="MethodLength|JavaNCSS|NPath" files="DescribeTopicPartitionsRequestHandlerTest.java"/>
+
<!-- Clients -->
<suppress id="dontUseSystemExit"
files="Exit.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index ee0d773..16bec4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -117,7 +117,8 @@
GET_TELEMETRY_SUBSCRIPTIONS(ApiMessageType.GET_TELEMETRY_SUBSCRIPTIONS),
PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY),
ASSIGN_REPLICAS_TO_DIRS(ApiMessageType.ASSIGN_REPLICAS_TO_DIRS),
- LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES);
+ LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES),
+ DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 23f67cb..b51221f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -324,6 +324,8 @@
return AssignReplicasToDirsRequest.parse(buffer, apiVersion);
case LIST_CLIENT_METRICS_RESOURCES:
return ListClientMetricsResourcesRequest.parse(buffer, apiVersion);
+ case DESCRIBE_TOPIC_PARTITIONS:
+ return DescribeTopicPartitionsRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index f99da4e..dbafdbf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -261,6 +261,8 @@
return AssignReplicasToDirsResponse.parse(responseBuffer, version);
case LIST_CLIENT_METRICS_RESOURCES:
return ListClientMetricsResourcesResponse.parse(responseBuffer, version);
+ case DESCRIBE_TOPIC_PARTITIONS:
+ return DescribeTopicPartitionsResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
new file mode 100644
index 0000000..588c562
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class DescribeTopicPartitionsRequest extends AbstractRequest {
+ public static class Builder extends AbstractRequest.Builder<DescribeTopicPartitionsRequest> {
+ private final DescribeTopicPartitionsRequestData data;
+
+ public Builder(DescribeTopicPartitionsRequestData data) {
+ super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS);
+ this.data = data;
+ }
+
+ public Builder(List<String> topics) {
+ super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS, ApiKeys.DESCRIBE_TOPIC_PARTITIONS.oldestVersion(),
+ ApiKeys.DESCRIBE_TOPIC_PARTITIONS.latestVersion());
+ DescribeTopicPartitionsRequestData data = new DescribeTopicPartitionsRequestData();
+ topics.forEach(topicName -> data.topics().add(
+ new DescribeTopicPartitionsRequestData.TopicRequest().setName(topicName))
+ );
+ this.data = data;
+ }
+
+ @Override
+ public DescribeTopicPartitionsRequest build(short version) {
+ return new DescribeTopicPartitionsRequest(data, version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+
+ }
+
+ private final DescribeTopicPartitionsRequestData data;
+
+ public DescribeTopicPartitionsRequest(DescribeTopicPartitionsRequestData data) {
+ super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS, (short) 0);
+ this.data = data;
+ }
+
+ public DescribeTopicPartitionsRequest(DescribeTopicPartitionsRequestData data, short version) {
+ super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS, version);
+ this.data = data;
+ }
+
+ @Override
+ public DescribeTopicPartitionsRequestData data() {
+ return data;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ Errors error = Errors.forException(e);
+ DescribeTopicPartitionsResponseData responseData = new DescribeTopicPartitionsResponseData();
+ for (DescribeTopicPartitionsRequestData.TopicRequest topic : data.topics()) {
+ responseData.topics().add(new DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic()
+ .setName(topic.name())
+ .setErrorCode(error.code())
+ .setIsInternal(false)
+ .setPartitions(Collections.emptyList())
+ );
+ }
+ responseData.setThrottleTimeMs(throttleTimeMs);
+ return new DescribeTopicPartitionsResponse(responseData);
+ }
+
+ public static DescribeTopicPartitionsRequest parse(ByteBuffer buffer, short version) {
+ return new DescribeTopicPartitionsRequest(
+ new DescribeTopicPartitionsRequestData(new ByteBufferAccessor(buffer), version),
+ version);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsResponse.java
new file mode 100644
index 0000000..e92f03d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsResponse.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeTopicPartitionsResponse extends AbstractResponse {
+ private final DescribeTopicPartitionsResponseData data;
+
+ public DescribeTopicPartitionsResponse(DescribeTopicPartitionsResponseData data) {
+ super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS);
+ this.data = data;
+ }
+
+ @Override
+ public DescribeTopicPartitionsResponseData data() {
+ return data;
+ }
+
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
+ }
+
+ @Override
+ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+ data.setThrottleTimeMs(throttleTimeMs);
+ }
+
+ @Override
+ public boolean shouldClientThrottle(short version) {
+ return true;
+ }
+
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ data.topics().forEach(topicResponse -> {
+ topicResponse.partitions().forEach(p -> updateErrorCounts(errorCounts, Errors.forCode(p.errorCode())));
+ updateErrorCounts(errorCounts, Errors.forCode(topicResponse.errorCode()));
+ });
+ return errorCounts;
+ }
+
+ public static DescribeTopicPartitionsResponse prepareResponse(
+ int throttleTimeMs,
+ List<DescribeTopicPartitionsResponseTopic> topics
+ ) {
+ DescribeTopicPartitionsResponseData responseData = new DescribeTopicPartitionsResponseData();
+ responseData.setThrottleTimeMs(throttleTimeMs);
+ topics.forEach(topicResponse -> responseData.topics().add(topicResponse));
+ return new DescribeTopicPartitionsResponse(responseData);
+ }
+
+ public static DescribeTopicPartitionsResponse parse(ByteBuffer buffer, short version) {
+ return new DescribeTopicPartitionsResponse(
+ new DescribeTopicPartitionsResponseData(new ByteBufferAccessor(buffer), version));
+ }
+}
diff --git a/clients/src/main/resources/common/message/DescribeTopicPartitionsRequest.json b/clients/src/main/resources/common/message/DescribeTopicPartitionsRequest.json
new file mode 100644
index 0000000..63c5b5c
--- /dev/null
+++ b/clients/src/main/resources/common/message/DescribeTopicPartitionsRequest.json
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 75,
+ "type": "request",
+ "listeners": ["broker"],
+ "name": "DescribeTopicPartitionsRequest",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "Topics", "type": "[]TopicRequest", "versions": "0+",
+ "about": "The topics to fetch details for.",
+ "fields": [
+ { "name": "Name", "type": "string", "versions": "0+",
+ "about": "The topic name", "versions": "0+", "entityType": "topicName"}
+ ]
+ },
+ { "name": "ResponsePartitionLimit", "type": "int32", "versions": "0+", "default": "2000",
+ "about": "The maximum number of partitions included in the response." },
+ { "name": "Cursor", "type": "Cursor", "versions": "0+", "nullableVersions": "0+", "default": "null",
+ "about": "The first topic and partition index to fetch details for.", "fields": [
+ { "name": "TopicName", "type": "string", "versions": "0+",
+ "about": "The name for the first topic to process", "versions": "0+", "entityType": "topicName"},
+ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index to start with"}
+ ]}
+ ]
+}
diff --git a/clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json b/clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json
new file mode 100644
index 0000000..e8eee7d
--- /dev/null
+++ b/clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 75,
+ "type": "response",
+ "name": "DescribeTopicPartitionsResponse",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
+ "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+ { "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", "versions": "0+",
+ "about": "Each topic in the response.", "fields": [
+ { "name": "ErrorCode", "type": "int16", "versions": "0+",
+ "about": "The topic error, or 0 if there was no error." },
+ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "0+",
+ "about": "The topic name." },
+ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." },
+ { "name": "IsInternal", "type": "bool", "versions": "0+", "default": "false", "ignorable": true,
+ "about": "True if the topic is internal." },
+ { "name": "Partitions", "type": "[]DescribeTopicPartitionsResponsePartition", "versions": "0+",
+ "about": "Each partition in the topic.", "fields": [
+ { "name": "ErrorCode", "type": "int16", "versions": "0+",
+ "about": "The partition error, or 0 if there was no error." },
+ { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+ "about": "The partition index." },
+ { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
+ "about": "The ID of the leader broker." },
+ { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
+ "about": "The leader epoch of this partition." },
+ { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
+ "about": "The set of all nodes that host this partition." },
+ { "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
+ "about": "The set of nodes that are in sync with the leader for this partition." },
+ { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+ "versions": "0+", "nullableVersions": "0+",
+ "about": "The new eligible leader replicas otherwise." },
+ { "name": "LastKnownElr", "type": "[]int32", "default": "null", "entityType": "brokerId",
+ "versions": "0+", "nullableVersions": "0+",
+ "about": "The last known ELR." },
+ { "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", "ignorable": true, "entityType": "brokerId",
+ "about": "The set of offline replicas of this partition." }]},
+ { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
+ "about": "32-bit bitfield to represent authorized operations for this topic." }]
+ },
+ { "name": "NextCursor", "type": "Cursor", "versions": "0+", "nullableVersions": "0+", "default": "null",
+ "about": "The next topic and partition index to fetch details for.", "fields": [
+ { "name": "TopicName", "type": "string", "versions": "0+",
+ "about": "The name for the first topic to process", "versions": "0+", "entityType": "topicName"},
+ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index to start with"}
+ ]}
+ ]
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 4a19a3a..b1fdf35 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -130,6 +130,8 @@
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.DescribeQuorumRequestData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
import org.apache.kafka.common.message.DescribeTransactionsRequestData;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
@@ -1077,6 +1079,7 @@
case PUSH_TELEMETRY: return createPushTelemetryRequest(version);
case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsRequest(version);
case LIST_CLIENT_METRICS_RESOURCES: return createListClientMetricsResourcesRequest(version);
+ case DESCRIBE_TOPIC_PARTITIONS: return createDescribeTopicPartitionsRequest(version);
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@@ -1158,6 +1161,7 @@
case PUSH_TELEMETRY: return createPushTelemetryResponse();
case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsResponse();
case LIST_CLIENT_METRICS_RESOURCES: return createListClientMetricsResourcesResponse();
+ case DESCRIBE_TOPIC_PARTITIONS: return createDescribeTopicPartitionsResponse();
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@@ -1251,6 +1255,41 @@
return new AssignReplicasToDirsResponse(data);
}
+ private DescribeTopicPartitionsRequest createDescribeTopicPartitionsRequest(short version) {
+ DescribeTopicPartitionsRequestData data = new DescribeTopicPartitionsRequestData()
+ .setTopics(Arrays.asList(new DescribeTopicPartitionsRequestData.TopicRequest().setName("foo")))
+ .setCursor(new DescribeTopicPartitionsRequestData.Cursor().setTopicName("foo").setPartitionIndex(1));
+ return new DescribeTopicPartitionsRequest.Builder(data).build(version);
+ }
+
+ private DescribeTopicPartitionsResponse createDescribeTopicPartitionsResponse() {
+ DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopicCollection collection =
+ new DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopicCollection();
+ collection.add(
+ new DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic()
+ .setTopicId(Uuid.fromString("sKhZV8LnTA275KvByB9bVg"))
+ .setErrorCode((short) 0)
+ .setIsInternal(false)
+ .setName("foo")
+ .setTopicAuthorizedOperations(0)
+ .setPartitions(Arrays.asList(
+ new DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition()
+ .setErrorCode((short) 0)
+ .setIsrNodes(Arrays.asList(1))
+ .setPartitionIndex(1)
+ .setLeaderId(1)
+ .setReplicaNodes(Arrays.asList(1))
+ .setLeaderEpoch(0)
+ ))
+ );
+ DescribeTopicPartitionsResponseData data = new DescribeTopicPartitionsResponseData()
+ .setTopics(collection)
+ .setNextCursor(
+ new DescribeTopicPartitionsResponseData.Cursor().setTopicName("foo").setPartitionIndex(2)
+ );
+ return new DescribeTopicPartitionsResponse(data);
+ }
+
private ConsumerGroupHeartbeatRequest createConsumerGroupHeartbeatRequest(short version) {
ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData()
.setGroupId("group")
diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java
new file mode 100644
index 0000000..e9ec3d8
--- /dev/null
+++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.metadata.KRaftMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+ KRaftMetadataCache metadataCache;
+ AuthHelper authHelper;
+ KafkaConfig config;
+
+ public DescribeTopicPartitionsRequestHandler(
+ KRaftMetadataCache metadataCache,
+ AuthHelper authHelper,
+ KafkaConfig config
+ ) {
+ this.metadataCache = metadataCache;
+ this.authHelper = authHelper;
+ this.config = config;
+ }
+
+ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
+ DescribeTopicPartitionsRequestData request = ((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
+ Set<String> topics = new HashSet<>();
+ boolean fetchAllTopics = request.topics().isEmpty();
+ DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
+ String cursorTopicName = cursor != null ? cursor.topicName() : "";
+ if (fetchAllTopics) {
+ JavaConverters.asJavaCollection(metadataCache.getAllTopics()).forEach(topicName -> {
+ if (topicName.compareTo(cursorTopicName) >= 0) {
+ topics.add(topicName);
+ }
+ });
+ } else {
+ request.topics().forEach(topic -> {
+ String topicName = topic.name();
+ if (topicName.compareTo(cursorTopicName) >= 0) {
+ topics.add(topicName);
+ }
+ });
+
+ if (cursor != null && !topics.contains(cursor.topicName())) {
+ // The topic in cursor must be included in the topic list if provided.
+ throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName());
+ }
+ }
+
+ // Do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
+ Set<DescribeTopicPartitionsResponseTopic> unauthorizedForDescribeTopicMetadata = new HashSet<>();
+
+ Stream<String> authorizedTopicsStream = topics.stream().sorted().filter(topicName -> {
+ boolean isAuthorized = authHelper.authorize(
+ abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, true, 1);
+ if (!fetchAllTopics && !isAuthorized) {
+ // We should not return topicId when on unauthorized error, so we return zero uuid.
+ unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic(
+ Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, Collections.emptyList())
+ );
+ }
+ return isAuthorized;
+ });
+
+ DescribeTopicPartitionsResponseData response = metadataCache.getTopicMetadataForDescribeTopicResponse(
+ JavaConverters.asScalaIterator(authorizedTopicsStream.iterator()),
+ abstractRequest.context().listenerName,
+ (String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0,
+ Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()),
+ fetchAllTopics
+ );
+
+ // get topic authorized operations
+ response.topics().forEach(topicData ->
+ topicData.setTopicAuthorizedOperations(authHelper.authorizedOperations(abstractRequest, new Resource(TOPIC, topicData.name()))));
+
+ response.topics().addAll(unauthorizedForDescribeTopicMetadata);
+ return response;
+ }
+
+ private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopic(
+ Errors error,
+ String topic,
+ Uuid topicId,
+ Boolean isInternal,
+ List<DescribeTopicPartitionsResponsePartition> partitionData
+ ) {
+ return new DescribeTopicPartitionsResponseTopic()
+ .setErrorCode(error.code())
+ .setName(topic)
+ .setTopicId(topicId)
+ .setIsInternal(isInternal)
+ .setPartitions(partitionData);
+ }
+}
diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
index e626fe3..bf12037 100644
--- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
+++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
@@ -62,6 +62,7 @@
case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeProducersRequest => DescribeProducersRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
+ case res: DescribeTopicPartitionsRequest => DescribeTopicPartitionsRequestDataJsonConverter.write(res.data, request.version)
case req: DescribeTransactionsRequest => DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version)
case res: DescribeUserScramCredentialsRequest => DescribeUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version)
case req: ElectLeadersRequest => ElectLeadersRequestDataJsonConverter.write(req.data, request.version)
@@ -144,6 +145,7 @@
case res: DescribeLogDirsResponse => DescribeLogDirsResponseDataJsonConverter.write(res.data, version)
case res: DescribeProducersResponse => DescribeProducersResponseDataJsonConverter.write(res.data, version)
case res: DescribeQuorumResponse => DescribeQuorumResponseDataJsonConverter.write(res.data, version)
+ case res: DescribeTopicPartitionsResponse => DescribeTopicPartitionsResponseDataJsonConverter.write(res.data, version)
case res: DescribeTransactionsResponse => DescribeTransactionsResponseDataJsonConverter.write(res.data, version)
case res: DescribeUserScramCredentialsResponse => DescribeUserScramCredentialsResponseDataJsonConverter.write(res.data, version)
case res: ElectLeadersResponse => ElectLeadersResponseDataJsonConverter.write(res.data, version)
@@ -182,8 +184,8 @@
case res: UnregisterBrokerResponse => UnregisterBrokerResponseDataJsonConverter.write(res.data, version)
case res: UpdateFeaturesResponse => UpdateFeaturesResponseDataJsonConverter.write(res.data, version)
case res: UpdateMetadataResponse => UpdateMetadataResponseDataJsonConverter.write(res.data, version)
- case res: WriteTxnMarkersResponse => WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
case res: VoteResponse => VoteResponseDataJsonConverter.write(res.data, version)
+ case res: WriteTxnMarkersResponse => WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " +
"code should be updated to do so.");
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 93f55f0..ea66f32 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -22,20 +22,20 @@
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.server.metadata.ConfigRepository
+import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
+import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.admin.AdminUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry, EndpointType}
-import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.internals.{FatalExitError, Topic}
-import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
-import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResultCollection
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse}
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
@@ -79,8 +79,8 @@
import java.nio.ByteBuffer
import java.time.Duration
import java.util
-import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.{Collections, Optional, OptionalInt}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
@@ -120,6 +120,11 @@
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", config)
val configManager = new ConfigAdminManager(brokerId, config, configRepository)
+ val describeTopicPartitionsRequestHandler : Option[DescribeTopicPartitionsRequestHandler] = metadataCache match {
+ case kRaftMetadataCache: KRaftMetadataCache =>
+ Some(new DescribeTopicPartitionsRequestHandler(kRaftMetadataCache, authHelper, config))
+ case _ => None
+ }
def close(): Unit = {
aclApis.close()
@@ -247,6 +252,7 @@
case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
+ case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => handleDescribeTopicPartitionsRequest(request)
case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request)
case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request)
@@ -1409,6 +1415,22 @@
))
}
+ def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = {
+ describeTopicPartitionsRequestHandler match {
+ case Some(handler) => {
+ val response = handler.handleDescribeTopicPartitionsRequest(request)
+ trace("Sending topic partitions metadata %s for correlation id %d to client %s".format(response.topics().asScala.mkString(","),
+ request.header.correlationId, request.header.clientId))
+
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+ response.setThrottleTimeMs(requestThrottleMs)
+ new DescribeTopicPartitionsResponse(response)
+ })
+ }
+ case None => throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request")
+ }
+ }
+
/**
* Handle an offset fetch request
*/
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fec8519..c8910a5 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -330,6 +330,9 @@
val MaxIncrementalFetchSessionCacheSlots = "max.incremental.fetch.session.cache.slots"
val FetchMaxBytes = "fetch.max.bytes"
+ /** ********* Request Limit Configuration **************/
+ val MaxRequestPartitionSizeLimit = "max.request.partition.size.limit"
+
/** ********* Quota Configuration ***********/
val NumQuotaSamplesProp = "quota.window.num"
val NumReplicationQuotaSamplesProp = "replication.quota.window.num"
@@ -827,6 +830,9 @@
val MaxIncrementalFetchSessionCacheSlotsDoc = "The maximum number of incremental fetch sessions that we will maintain."
val FetchMaxBytesDoc = "The maximum number of bytes we will return for a fetch request. Must be at least 1024."
+ /** ********* Request Limit Configuration **************/
+ val MaxRequestPartitionSizeLimitDoc = "The maximum number of partitions can be served in one request."
+
/** ********* Quota Configuration ***********/
val NumQuotaSamplesDoc = "The number of samples to retain in memory for client quotas"
val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas"
@@ -1184,6 +1190,9 @@
.define(MaxIncrementalFetchSessionCacheSlots, INT, Defaults.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS, atLeast(0), MEDIUM, MaxIncrementalFetchSessionCacheSlotsDoc)
.define(FetchMaxBytes, INT, Defaults.FETCH_MAX_BYTES, atLeast(1024), MEDIUM, FetchMaxBytesDoc)
+ /** ********* Request Limit Configuration ***********/
+ .define(MaxRequestPartitionSizeLimit, INT, Defaults.MAX_REQUEST_PARTITION_SIZE_LIMIT, atLeast(1), MEDIUM, MaxRequestPartitionSizeLimitDoc)
+
/** ********* Kafka Metrics Configuration ***********/
.define(MetricNumSamplesProp, INT, Defaults.METRIC_NUM_SAMPLES, atLeast(1), LOW, MetricNumSamplesDoc)
.define(MetricSampleWindowMsProp, LONG, Defaults.METRIC_SAMPLE_WINDOW_MS, atLeast(1), LOW, MetricSampleWindowMsDoc)
@@ -1882,6 +1891,9 @@
val maxIncrementalFetchSessionCacheSlots = getInt(KafkaConfig.MaxIncrementalFetchSessionCacheSlots)
val fetchMaxBytes = getInt(KafkaConfig.FetchMaxBytes)
+ /** ********* Request Limit Configuration ***********/
+ val maxRequestPartitionSizeLimit = getInt(KafkaConfig.MaxRequestPartitionSizeLimit)
+
val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
def compressionType = getString(KafkaConfig.CompressionTypeProp)
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 485bba8..91fd95f 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -21,27 +21,29 @@
import kafka.server.{CachedControllerId, KRaftCachedControllerId, MetadataCache}
import kafka.utils.Logging
import org.apache.kafka.admin.BrokerMetadata
+import org.apache.kafka.common._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.InvalidTopicException
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.{Cursor, DescribeTopicPartitionsResponsePartition, DescribeTopicPartitionsResponseTopic}
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
-import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.message._
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.MetadataResponse
import org.apache.kafka.image.MetadataImage
-
-import java.util
-import java.util.{Collections, Properties}
-import java.util.concurrent.ThreadLocalRandom
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData}
-import org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData}
import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, Replicas}
import org.apache.kafka.server.common.{Features, MetadataVersion}
+import java.util
+import java.util.concurrent.ThreadLocalRandom
+import java.util.{Collections, Properties}
+import scala.collection.mutable.ListBuffer
import scala.collection.{Map, Seq, Set, mutable}
-import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
+import scala.jdk.CollectionConverters._
+import scala.util.control.Breaks._
class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging with ConfigRepository {
@@ -140,6 +142,73 @@
}
}
+ /**
+ * Return topic partition metadata for the given topic, listener and index range. Also, return the next partition
+ * index that is not included in the result.
+ *
+ * @param image The metadata image
+ * @param topicName The name of the topic.
+ * @param listenerName The listener name.
+ * @param startIndex The smallest index of the partitions to be included in the result.
+ * @param upperIndex The upper limit of the index of the partitions to be included in the result.
+ * Note that, the upper index can be larger than the largest partition index in
+ * this topic.
+ * @return A collection of topic partition metadata and next partition index (-1 means
+ * no next partition).
+ */
+ private def getPartitionMetadataForDescribeTopicResponse(
+ image: MetadataImage,
+ topicName: String,
+ listenerName: ListenerName,
+ startIndex: Int,
+ maxCount: Int
+ ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
+ Option(image.topics().getTopic(topicName)) match {
+ case None => (None, -1)
+ case Some(topic) => {
+ val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
+ val partitions = topic.partitions().keySet()
+ val upperIndex = topic.partitions().size().min(startIndex + maxCount)
+ val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1
+ for (partitionId <- startIndex until upperIndex) {
+ topic.partitions().get(partitionId) match {
+ case partition : PartitionRegistration => {
+ val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas,
+ listenerName, false)
+ val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, false)
+ val offlineReplicas = getOfflineReplicas(image, partition, listenerName)
+ val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName)
+ maybeLeader match {
+ case None =>
+ result.append(new DescribeTopicPartitionsResponsePartition()
+ .setPartitionIndex(partitionId)
+ .setLeaderId(MetadataResponse.NO_LEADER_ID)
+ .setLeaderEpoch(partition.leaderEpoch)
+ .setReplicaNodes(filteredReplicas)
+ .setIsrNodes(filteredIsr)
+ .setOfflineReplicas(offlineReplicas)
+ .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+ .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+ case Some(leader) =>
+ result.append(new DescribeTopicPartitionsResponsePartition()
+ .setPartitionIndex(partitionId)
+ .setLeaderId(leader.id())
+ .setLeaderEpoch(partition.leaderEpoch)
+ .setReplicaNodes(filteredReplicas)
+ .setIsrNodes(filteredIsr)
+ .setOfflineReplicas(offlineReplicas)
+ .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+ .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+ }
+ }
+ case _ => warn(s"The partition $partitionId does not exist for $topicName")
+ }
+ }
+ (Some(result.toList), nextIndex)
+ }
+ }
+ }
+
private def getOfflineReplicas(image: MetadataImage,
partition: PartitionRegistration,
listenerName: ListenerName): util.List[Integer] = {
@@ -189,6 +258,86 @@
}
}
+ /**
+ * Get the topic metadata for the given topics.
+ *
+ * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first
+ * partition can't be returned due the limit.
+ * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response.
+ *
+ * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData
+ * will also be sorted in alphabetical order.
+ *
+ * @param topics The iterator of topics and their corresponding first partition id to fetch.
+ * @param listenerName The listener name.
+ * @param firstTopicPartitionStartIndex The start partition index for the first topic
+ * @param maximumNumberOfPartitions The max number of partitions to return.
+ * @param ignoreTopicsWithExceptions Whether ignore the topics with exception.
+ */
+ def getTopicMetadataForDescribeTopicResponse(
+ topics: Iterator[String],
+ listenerName: ListenerName,
+ topicPartitionStartIndex: String => Int,
+ maximumNumberOfPartitions: Int,
+ ignoreTopicsWithExceptions: Boolean
+ ): DescribeTopicPartitionsResponseData = {
+ val image = _currentImage
+ var remaining = maximumNumberOfPartitions
+ val result = new DescribeTopicPartitionsResponseData()
+ breakable {
+ topics.foreach { topicName =>
+ if (remaining > 0) {
+ val (partitionResponse, nextPartition) =
+ getPartitionMetadataForDescribeTopicResponse(
+ image, topicName, listenerName, topicPartitionStartIndex(topicName), remaining
+ )
+ partitionResponse.map(partitions => {
+ val response = new DescribeTopicPartitionsResponseTopic()
+ .setErrorCode(Errors.NONE.code)
+ .setName(topicName)
+ .setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
+ .setIsInternal(Topic.isInternal(topicName))
+ .setPartitions(partitions.asJava)
+ result.topics().add(response)
+
+ if (nextPartition != -1) {
+ result.setNextCursor(new Cursor()
+ .setTopicName(topicName)
+ .setPartitionIndex(nextPartition)
+ )
+ break()
+ }
+ remaining -= partitions.size
+ })
+
+ if (!ignoreTopicsWithExceptions && !partitionResponse.isDefined) {
+ val error = try {
+ Topic.validate(topicName)
+ Errors.UNKNOWN_TOPIC_OR_PARTITION
+ } catch {
+ case _: InvalidTopicException =>
+ Errors.INVALID_TOPIC_EXCEPTION
+ }
+ result.topics().add(new DescribeTopicPartitionsResponseTopic()
+ .setErrorCode(error.code())
+ .setName(topicName)
+ .setTopicId(getTopicId(topicName))
+ .setIsInternal(Topic.isInternal(topicName)))
+ }
+ } else if (remaining == 0) {
+ // The cursor should point to the beginning of the current topic. All the partitions in the previous topic
+ // should be fulfilled. Note that, if a partition is pointed in the NextTopicPartition, it does not mean
+ // this topic exists.
+ result.setNextCursor(new Cursor()
+ .setTopicName(topicName)
+ .setPartitionIndex(0))
+ break()
+ }
+ }
+ }
+ result
+ }
+
override def getAllTopics(): Set[String] = _currentImage.topics().topicsByName().keySet().asScala
override def getTopicPartitions(topicName: String): Set[TopicPartition] = {
diff --git a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
new file mode 100644
index 0000000..2596054
--- /dev/null
+++ b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.UpdateMetadataRequestData;
+import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.LeaderRecoveryState;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class DescribeTopicPartitionsRequestHandlerTest {
+ private int brokerId = 1;
+ private RequestChannel.Metrics requestChannelMetrics = mock(RequestChannel.Metrics.class);
+ private KafkaPrincipalSerde kafkaPrincipalSerde = new KafkaPrincipalSerde() {
+ @Override
+ public byte[] serialize(KafkaPrincipal principal) throws SerializationException {
+ return Utils.utf8(principal.toString());
+ }
+
+ @Override
+ public KafkaPrincipal deserialize(byte[] bytes) throws SerializationException {
+ return SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes));
+ }
+ };
+
+ ListenerName plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
+ UpdateMetadataBroker broker = new UpdateMetadataBroker()
+ .setId(0)
+ .setRack("rack")
+ .setEndpoints(Arrays.asList(
+ new UpdateMetadataRequestData.UpdateMetadataEndpoint()
+ .setHost("broker0")
+ .setPort(9092)
+ .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+ .setListener(plaintextListener.value())
+ ));
+
+ @Test
+ void testDescribeTopicPartitionsRequest() {
+ // 1. Set up authorizer
+ Authorizer authorizer = mock(Authorizer.class);
+ String unauthorizedTopic = "unauthorized-topic";
+ String authorizedTopic = "authorized-topic";
+ String authorizedNonExistTopic = "authorized-non-exist";
+
+ Action expectedActions1 = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, true, true);
+ Action expectedActions2 = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true);
+ Action expectedActions3 = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedNonExistTopic, PatternType.LITERAL), 1, true, true);
+
+ // Here we need to use AuthHelperTest.matchSameElements instead of EasyMock.eq since the order of the request is unknown
+ when(authorizer.authorize(any(RequestContext.class), argThat(t ->
+ t.contains(expectedActions1) || t.contains(expectedActions2) || t.contains(expectedActions3))))
+ .thenAnswer(invocation -> {
+ List<Action> actions = (List<Action>) invocation.getArgument(1);
+ return actions.stream().map(action -> {
+ if (action.resourcePattern().name().startsWith("authorized"))
+ return AuthorizationResult.ALLOWED;
+ else
+ return AuthorizationResult.DENIED;
+ }).collect(Collectors.toList());
+ });
+
+ // 2. Set up MetadataCache
+ Uuid authorizedTopicId = Uuid.randomUuid();
+ Uuid unauthorizedTopicId = Uuid.randomUuid();
+
+ Map<String, Uuid> topicIds = new HashMap<>();
+ topicIds.put(authorizedTopic, authorizedTopicId);
+ topicIds.put(unauthorizedTopic, unauthorizedTopicId);
+
+ BrokerEndpointCollection collection = new BrokerEndpointCollection();
+ collection.add(new BrokerEndpoint()
+ .setName(broker.endpoints().get(0).listener())
+ .setHost(broker.endpoints().get(0).host())
+ .setPort(broker.endpoints().get(0).port())
+ .setSecurityProtocol(broker.endpoints().get(0).securityProtocol())
+ );
+ List<ApiMessage> records = Arrays.asList(
+ new RegisterBrokerRecord()
+ .setBrokerId(broker.id())
+ .setBrokerEpoch(0)
+ .setIncarnationId(Uuid.randomUuid())
+ .setEndPoints(collection)
+ .setRack(broker.rack())
+ .setFenced(false),
+ new TopicRecord().setName(authorizedTopic).setTopicId(topicIds.get(authorizedTopic)),
+ new TopicRecord().setName(unauthorizedTopic).setTopicId(topicIds.get(unauthorizedTopic)),
+ new PartitionRecord()
+ .setTopicId(authorizedTopicId)
+ .setPartitionId(1)
+ .setReplicas(Arrays.asList(0, 1, 2))
+ .setLeader(0)
+ .setIsr(Arrays.asList(0))
+ .setEligibleLeaderReplicas(Arrays.asList(1))
+ .setLastKnownElr(Arrays.asList(2))
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+ new PartitionRecord()
+ .setTopicId(authorizedTopicId)
+ .setPartitionId(0)
+ .setReplicas(Arrays.asList(0, 1, 2))
+ .setLeader(0)
+ .setIsr(Arrays.asList(0))
+ .setEligibleLeaderReplicas(Arrays.asList(1))
+ .setLastKnownElr(Arrays.asList(2))
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+ new PartitionRecord()
+ .setTopicId(unauthorizedTopicId)
+ .setPartitionId(0)
+ .setReplicas(Arrays.asList(0, 1, 3))
+ .setLeader(0)
+ .setIsr(Arrays.asList(0))
+ .setEligibleLeaderReplicas(Arrays.asList(1))
+ .setLastKnownElr(Arrays.asList(3))
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(2)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
+ );
+ KRaftMetadataCache metadataCache = new KRaftMetadataCache(0);
+ updateKraftMetadataCache(metadataCache, records);
+ DescribeTopicPartitionsRequestHandler handler =
+ new DescribeTopicPartitionsRequestHandler(metadataCache, new AuthHelper(scala.Option.apply(authorizer)), createKafkaDefaultConfig());
+
+ // 3.1 Basic test
+ DescribeTopicPartitionsRequest describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(
+ new DescribeTopicPartitionsRequestData()
+ .setTopics(Arrays.asList(
+ new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
+ new DescribeTopicPartitionsRequestData.TopicRequest().setName(unauthorizedTopic)
+ ))
+ );
+ RequestChannel.Request request;
+ try {
+ request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+ } catch (Exception e) {
+ assertTrue(false, e.getMessage());
+ return;
+ }
+ DescribeTopicPartitionsResponseData response = handler.handleDescribeTopicPartitionsRequest(request);
+ List<DescribeTopicPartitionsResponseTopic> topics = response.topics().valuesList();
+ assertEquals(2, topics.size());
+ DescribeTopicPartitionsResponseTopic topicToCheck = topics.get(0);
+ assertEquals(authorizedTopicId, topicToCheck.topicId());
+ assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+ assertEquals(authorizedTopic, topicToCheck.name());
+ assertEquals(2, topicToCheck.partitions().size());
+
+ topicToCheck = topics.get(1);
+ assertNotEquals(unauthorizedTopicId, topicToCheck.topicId());
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), topicToCheck.errorCode());
+ assertEquals(unauthorizedTopic, topicToCheck.name());
+
+ // 3.2 With cursor
+ describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
+ .setTopics(Arrays.asList(
+ new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
+ new DescribeTopicPartitionsRequestData.TopicRequest().setName(unauthorizedTopic)
+ ))
+ .setCursor(new DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic).setPartitionIndex(1))
+ );
+
+ try {
+ request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+ } catch (Exception e) {
+ assertTrue(false, e.getMessage());
+ return;
+ }
+ response = handler.handleDescribeTopicPartitionsRequest(request);
+ topics = response.topics().valuesList();
+ assertEquals(2, topics.size());
+ topicToCheck = topics.get(0);
+ assertEquals(authorizedTopicId, topicToCheck.topicId());
+ assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+ assertEquals(authorizedTopic, topicToCheck.name());
+ assertEquals(1, topicToCheck.partitions().size());
+
+ topicToCheck = topics.get(1);
+ assertNotEquals(unauthorizedTopicId, topicToCheck.topicId());
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), topicToCheck.errorCode());
+ assertEquals(unauthorizedTopic, topicToCheck.name());
+
+ // 3.3 Fetch all topics
+ describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData());
+ try {
+ request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+ } catch (Exception e) {
+ assertTrue(false, e.getMessage());
+ return;
+ }
+ response = handler.handleDescribeTopicPartitionsRequest(request);
+ topics = response.topics().valuesList();
+ assertEquals(1, topics.size());
+ topicToCheck = topics.get(0);
+ assertEquals(authorizedTopicId, topicToCheck.topicId());
+ assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+ assertEquals(authorizedTopic, topicToCheck.name());
+ assertEquals(2, topicToCheck.partitions().size());
+
+ // 3.4 Fetch all topics with cursor
+ describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(
+ new DescribeTopicPartitionsRequestData().setCursor(
+ new DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic).setPartitionIndex(1)));
+ try {
+ request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+ } catch (Exception e) {
+ assertTrue(false, e.getMessage());
+ return;
+ }
+ response = handler.handleDescribeTopicPartitionsRequest(request);
+ topics = response.topics().valuesList();
+ assertEquals(1, topics.size());
+ topicToCheck = topics.get(0);
+ assertEquals(authorizedTopicId, topicToCheck.topicId());
+ assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+ assertEquals(authorizedTopic, topicToCheck.name());
+ assertEquals(1, topicToCheck.partitions().size());
+
+ // 3.5 Fetch all topics with limit
+ describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(
+ new DescribeTopicPartitionsRequestData().setResponsePartitionLimit(1)
+ );
+ try {
+ request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+ } catch (Exception e) {
+ assertTrue(false, e.getMessage());
+ return;
+ }
+ response = handler.handleDescribeTopicPartitionsRequest(request);
+ topics = response.topics().valuesList();
+ assertEquals(1, topics.size());
+ topicToCheck = topics.get(0);
+ assertEquals(authorizedTopicId, topicToCheck.topicId());
+ assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+ assertEquals(authorizedTopic, topicToCheck.name());
+ assertEquals(1, topicToCheck.partitions().size());
+ assertEquals(authorizedTopic, response.nextCursor().topicName());
+ assertEquals(1, response.nextCursor().partitionIndex());
+ }
+
+ @Test
+ void testDescribeTopicPartitionsRequestWithEdgeCases() {
+ // 1. Set up authorizer
+ Authorizer authorizer = mock(Authorizer.class);
+ String authorizedTopic = "authorized-topic1";
+ String authorizedTopic2 = "authorized-topic2";
+
+ Action expectedActions1 = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true);
+ Action expectedActions2 = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic2, PatternType.LITERAL), 1, true, true);
+
+ // Here we need to use AuthHelperTest.matchSameElements instead of EasyMock.eq since the order of the request is unknown
+ when(authorizer.authorize(any(RequestContext.class), argThat(t ->
+ t.contains(expectedActions1) || t.contains(expectedActions2))))
+ .thenAnswer(invocation -> {
+ List<Action> actions = (List<Action>) invocation.getArgument(1);
+ return actions.stream().map(action -> {
+ if (action.resourcePattern().name().startsWith("authorized"))
+ return AuthorizationResult.ALLOWED;
+ else
+ return AuthorizationResult.DENIED;
+ }).collect(Collectors.toList());
+ });
+
+ // 2. Set up MetadataCache
+ Uuid authorizedTopicId = Uuid.randomUuid();
+ Uuid authorizedTopicId2 = Uuid.randomUuid();
+
+ Map<String, Uuid> topicIds = new HashMap<>();
+ topicIds.put(authorizedTopic, authorizedTopicId);
+ topicIds.put(authorizedTopic2, authorizedTopicId2);
+
+ BrokerEndpointCollection collection = new BrokerEndpointCollection();
+ collection.add(new BrokerEndpoint()
+ .setName(broker.endpoints().get(0).listener())
+ .setHost(broker.endpoints().get(0).host())
+ .setPort(broker.endpoints().get(0).port())
+ .setSecurityProtocol(broker.endpoints().get(0).securityProtocol())
+ );
+ List<ApiMessage> records = Arrays.asList(
+ new RegisterBrokerRecord()
+ .setBrokerId(broker.id())
+ .setBrokerEpoch(0)
+ .setIncarnationId(Uuid.randomUuid())
+ .setEndPoints(collection)
+ .setRack(broker.rack())
+ .setFenced(false),
+ new TopicRecord().setName(authorizedTopic).setTopicId(topicIds.get(authorizedTopic)),
+ new TopicRecord().setName(authorizedTopic2).setTopicId(topicIds.get(authorizedTopic2)),
+ new PartitionRecord()
+ .setTopicId(authorizedTopicId)
+ .setPartitionId(0)
+ .setReplicas(Arrays.asList(0, 1, 2))
+ .setLeader(0)
+ .setIsr(Arrays.asList(0))
+ .setEligibleLeaderReplicas(Arrays.asList(1))
+ .setLastKnownElr(Arrays.asList(2))
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+ new PartitionRecord()
+ .setTopicId(authorizedTopicId)
+ .setPartitionId(1)
+ .setReplicas(Arrays.asList(0, 1, 2))
+ .setLeader(0)
+ .setIsr(Arrays.asList(0))
+ .setEligibleLeaderReplicas(Arrays.asList(1))
+ .setLastKnownElr(Arrays.asList(2))
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+ new PartitionRecord()
+ .setTopicId(authorizedTopicId2)
+ .setPartitionId(0)
+ .setReplicas(Arrays.asList(0, 1, 3))
+ .setLeader(0)
+ .setIsr(Arrays.asList(0))
+ .setEligibleLeaderReplicas(Arrays.asList(1))
+ .setLastKnownElr(Arrays.asList(3))
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(2)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
+ );
+ KRaftMetadataCache metadataCache = new KRaftMetadataCache(0);
+ updateKraftMetadataCache(metadataCache, records);
+ DescribeTopicPartitionsRequestHandler handler =
+ new DescribeTopicPartitionsRequestHandler(metadataCache, new AuthHelper(scala.Option.apply(authorizer)), createKafkaDefaultConfig());
+
+ // 3.1 With cursor point to the first one
+ DescribeTopicPartitionsRequest describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
+ .setTopics(Arrays.asList(
+ new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
+ new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic2)
+ ))
+ .setCursor(new DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic).setPartitionIndex(1))
+ );
+
+ RequestChannel.Request request;
+ try {
+ request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+ } catch (Exception e) {
+ assertTrue(false, e.getMessage());
+ return;
+ }
+ DescribeTopicPartitionsResponseData response = handler.handleDescribeTopicPartitionsRequest(request);
+ List<DescribeTopicPartitionsResponseTopic> topics = response.topics().valuesList();
+ assertEquals(2, topics.size());
+ DescribeTopicPartitionsResponseTopic topicToCheck = topics.get(0);
+ assertEquals(authorizedTopicId, topicToCheck.topicId());
+ assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+ assertEquals(authorizedTopic, topicToCheck.name());
+ assertEquals(1, topicToCheck.partitions().size());
+
+ topicToCheck = topics.get(1);
+ assertEquals(authorizedTopicId2, topicToCheck.topicId());
+ assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+ assertEquals(authorizedTopic2, topicToCheck.name());
+ assertEquals(1, topicToCheck.partitions().size());
+
+ // 3.2 With cursor point to the second one. The first topic should be ignored.
+ describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
+ .setTopics(Arrays.asList(
+ new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
+ new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic2)
+ ))
+ .setCursor(new DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic2).setPartitionIndex(0))
+ );
+
+ try {
+ request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+ } catch (Exception e) {
+ assertTrue(false, e.getMessage());
+ return;
+ }
+ response = handler.handleDescribeTopicPartitionsRequest(request);
+ topics = response.topics().valuesList();
+ assertEquals(1, topics.size());
+ topicToCheck = topics.get(0);
+ assertEquals(authorizedTopicId2, topicToCheck.topicId());
+ assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+ assertEquals(authorizedTopic2, topicToCheck.name());
+ assertEquals(1, topicToCheck.partitions().size());
+
+ // 3.3 With cursor point to a non existing topic. Exception should be thrown if not querying all the topics.
+ describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
+ .setTopics(Arrays.asList(
+ new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
+ new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic2)
+ ))
+ .setCursor(new DescribeTopicPartitionsRequestData.Cursor().setTopicName("Non-existing").setPartitionIndex(0))
+ );
+
+ try {
+ handler.handleDescribeTopicPartitionsRequest(buildRequest(describeTopicPartitionsRequest, plaintextListener));
+ } catch (Exception e) {
+ assertTrue(e instanceof InvalidRequestException, e.getMessage());
+ }
+ }
+
+ void updateKraftMetadataCache(KRaftMetadataCache kRaftMetadataCache, List<ApiMessage> records) {
+ MetadataImage image = kRaftMetadataCache.currentImage();
+ MetadataImage partialImage = new MetadataImage(
+ new MetadataProvenance(100L, 10, 1000L),
+ image.features(),
+ ClusterImage.EMPTY,
+ image.topics(),
+ image.configs(),
+ image.clientQuotas(),
+ image.producerIds(),
+ image.acls(),
+ image.scram(),
+ image.delegationTokens()
+ );
+ MetadataDelta delta = new MetadataDelta.Builder().setImage(partialImage).build();
+ records.stream().forEach(record -> delta.replay(record));
+ kRaftMetadataCache.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L)));
+ }
+
+ private RequestChannel.Request buildRequest(AbstractRequest request,
+ ListenerName listenerName
+ ) throws UnknownHostException {
+ ByteBuffer buffer = request.serializeWithHeader(
+ new RequestHeader(request.apiKey(), request.version(), "test-client", 0));
+
+ // read the header from the buffer first so that the body can be read next from the Request constructor
+ RequestHeader header = RequestHeader.parse(buffer);
+ // DelegationTokens require the context authenticated to be non SecurityProtocol.PLAINTEXT
+ // and have a non KafkaPrincipal.ANONYMOUS principal. This test is done before the check
+ // for forwarding because after forwarding the context will have a different context.
+ // We validate the context authenticated failure case in other integration tests.
+ RequestContext context = new RequestContext(header, "1", InetAddress.getLocalHost(), Optional.empty(), new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Alice"),
+ listenerName, SecurityProtocol.SSL, ClientInformation.EMPTY, false,
+ Optional.of(kafkaPrincipalSerde));
+ return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, buffer,
+ requestChannelMetrics, scala.Option.apply(null));
+ }
+
+ KafkaConfig createKafkaDefaultConfig() {
+ Properties properties = TestUtils.createBrokerConfig(
+ brokerId,
+ "",
+ true,
+ true,
+ TestUtils.RandomPort(),
+ scala.Option.apply(null),
+ scala.Option.apply(null),
+ scala.Option.apply(null),
+ true,
+ false,
+ TestUtils.RandomPort(),
+ false,
+ TestUtils.RandomPort(),
+ false,
+ TestUtils.RandomPort(),
+ scala.Option.apply(null),
+ 1,
+ false,
+ 1,
+ (short) 1,
+ false);
+ properties.put(KafkaConfig.NodeIdProp(), Integer.toString(brokerId));
+ properties.put(KafkaConfig.ProcessRolesProp(), "broker");
+ int voterId = brokerId + 1;
+ properties.put(KafkaConfig.QuorumVotersProp(), voterId + "@localhost:9093");
+ properties.put(KafkaConfig.ControllerListenerNamesProp(), "SSL");
+ TestUtils.setIbpAndMessageFormatVersions(properties, MetadataVersion.latestProduction());
+ return new KafkaConfig(properties);
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e769c7d..3d0f3ac 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -17,12 +17,6 @@
package kafka.server
-import java.net.InetAddress
-import java.nio.charset.StandardCharsets
-import java.util
-import java.util.Arrays.asList
-import java.util.concurrent.{CompletableFuture, TimeUnit}
-import java.util.{Collections, Comparator, Optional, OptionalInt, OptionalLong, Properties}
import kafka.api.LeaderAndIsr
import kafka.cluster.{Broker, Partition}
import kafka.controller.{ControllerContext, KafkaController}
@@ -37,30 +31,28 @@
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => LAlterConfigsResourceCollection}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection => LAlterableConfigCollection}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => LAlterableConfig}
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource, AlterConfigsResourceCollection => LAlterConfigsResourceCollection, AlterableConfig => LAlterableConfig, AlterableConfigCollection => LAlterableConfigCollection}
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup
+import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource}
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResourceCollection => IAlterConfigsResourceCollection}
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfig => IAlterableConfig}
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfigCollection => IAlterableConfigCollection}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
+import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
+import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.message._
@@ -79,32 +71,33 @@
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
-import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, Test}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
-import org.mockito.ArgumentMatchers.{any, anyBoolean, anyDouble, anyInt, anyLong, anyShort, anyString, argThat, isNotNull}
-import org.mockito.Mockito.{mock, reset, times, verify, when}
-import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
-
-import scala.collection.{Map, Seq, mutable}
-import scala.jdk.CollectionConverters._
-import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
-import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
-import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
-import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
+import org.apache.kafka.common._
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.ClientMetricsManager
-import org.apache.kafka.server.common.{Features, MetadataVersion}
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
+import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.config.{ConfigType, Defaults}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.util.{FutureUtils, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
+import java.net.InetAddress
+import java.nio.charset.StandardCharsets
import java.time.Duration
+import java.util
+import java.util.Arrays.asList
+import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.{Collections, Comparator, Optional, OptionalInt, OptionalLong, Properties}
+import scala.collection.{Map, Seq, mutable}
+import scala.jdk.CollectionConverters._
class KafkaApisTest extends Logging {
private val requestChannel: RequestChannel = mock(classOf[RequestChannel])
@@ -4146,7 +4139,7 @@
}
}
- /**
+ /**
* Verifies that sending a fetch request with version 9 works correctly when
* ReplicaManager.getLogConfig returns None.
*/
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 8755499..6047ad43 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -16,28 +16,29 @@
*/
package kafka.server
-import org.apache.kafka.common.{DirectoryId, Node, TopicPartition, Uuid}
-
-import java.util
-import java.util.Arrays.asList
-import java.util.Collections
import kafka.api.LeaderAndIsr
import kafka.server.metadata.{KRaftMetadataCache, MetadataSnapshot, ZkMetadataCache}
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState, UpdateMetadataTopicState}
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection}
+import org.apache.kafka.common.metadata._
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{AbstractControlRequest, UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.metadata.{BrokerRegistrationChangeRecord, PartitionRecord, RegisterBrokerRecord, RemoveTopicRecord, TopicRecord}
-import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection}
+import org.apache.kafka.common.{DirectoryId, Node, TopicPartition, Uuid}
import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, MetadataProvenance}
+import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
-import org.junit.jupiter.api.Test
+import java.util
+import java.util.Arrays.asList
+import java.util.Collections
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
@@ -53,7 +54,7 @@
MetadataCache.kRaftMetadataCache(1)
)
- def updateCache(cache: MetadataCache, request: UpdateMetadataRequest): Unit = {
+ def updateCache(cache: MetadataCache, request: UpdateMetadataRequest, records: Seq[ApiMessage] = List()): Unit = {
cache match {
case c: ZkMetadataCache => c.updateMetadata(0, request)
case c: KRaftMetadataCache => {
@@ -126,6 +127,7 @@
request.topicStates().forEach { topic =>
toRecords(topic).foreach(delta.replay)
}
+ records.foreach(delta.replay)
c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L)))
}
case _ => throw new RuntimeException("Unsupported cache type")
@@ -742,6 +744,169 @@
assertEquals(-1L, metadataCache.getAliveBrokerEpoch(1).getOrElse(-1L))
}
+ @Test
+ def testGetTopicMetadataForDescribeTopicPartitionsResponse(): Unit = {
+ val metadataCache = MetadataCache.kRaftMetadataCache(0)
+
+ val controllerId = 2
+ val controllerEpoch = 1
+ val securityProtocol = SecurityProtocol.PLAINTEXT
+ val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+ val topic0 = "test0"
+ val topic1 = "test1"
+
+ val topicIds = new util.HashMap[String, Uuid]()
+ topicIds.put(topic0, Uuid.randomUuid())
+ topicIds.put(topic1, Uuid.randomUuid())
+
+ val partitionMap = Map[(String, Int), PartitionRecord](
+ (topic0, 0) -> new PartitionRecord()
+ .setTopicId(topicIds.get(topic0))
+ .setPartitionId(0)
+ .setReplicas(asList(0, 1, 2))
+ .setLeader(0)
+ .setIsr(asList(0))
+ .setEligibleLeaderReplicas(asList(1))
+ .setLastKnownElr(asList(2))
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+ (topic0, 2) -> new PartitionRecord()
+ .setTopicId(topicIds.get(topic0))
+ .setPartitionId(2)
+ .setReplicas(asList(0, 2, 3))
+ .setLeader(3)
+ .setIsr(asList(3))
+ .setEligibleLeaderReplicas(asList(2))
+ .setLastKnownElr(asList(0))
+ .setLeaderEpoch(1)
+ .setPartitionEpoch(2)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+ (topic0, 1) -> new PartitionRecord()
+ .setTopicId(topicIds.get(topic0))
+ .setPartitionId(1)
+ .setReplicas(asList(0, 1, 3))
+ .setLeader(0)
+ .setIsr(asList(0))
+ .setEligibleLeaderReplicas(asList(1))
+ .setLastKnownElr(asList(3))
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(2)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+ (topic1, 0) -> new PartitionRecord()
+ .setTopicId(topicIds.get(topic1))
+ .setPartitionId(0)
+ .setReplicas(asList(0, 1, 2))
+ .setLeader(2)
+ .setIsr(asList(2))
+ .setEligibleLeaderReplicas(asList(1))
+ .setLastKnownElr(asList(0))
+ .setLeaderEpoch(10)
+ .setPartitionEpoch(11)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+ )
+
+ val brokers = Seq(
+ new UpdateMetadataBroker().setId(0).setEndpoints(Seq(new UpdateMetadataEndpoint().setHost("foo0").setPort(9092).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
+ new UpdateMetadataBroker().setId(1).setEndpoints(Seq(new UpdateMetadataEndpoint().setHost("foo1").setPort(9093).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
+ new UpdateMetadataBroker().setId(2).setEndpoints(Seq(new UpdateMetadataEndpoint().setHost("foo2").setPort(9094).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
+ new UpdateMetadataBroker().setId(3).setEndpoints(Seq(new UpdateMetadataEndpoint().setHost("foo3").setPort(9095).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
+ )
+
+ val version = ApiKeys.UPDATE_METADATA.latestVersion
+ val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch,
+ List[UpdateMetadataPartitionState]().asJava, brokers.asJava, topicIds).build()
+ var recordSeq = Seq[ApiMessage](
+ new TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0)),
+ new TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1))
+ )
+ recordSeq = recordSeq ++ partitionMap.values.toSeq
+ MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest, recordSeq)
+
+ def checkTopicMetadata(topic: String, partitionIds: Set[Int], partitions: mutable.Buffer[DescribeTopicPartitionsResponsePartition]): Unit = {
+ partitions.foreach(partition => {
+ val partitionId = partition.partitionIndex()
+ assertTrue(partitionIds.contains(partitionId))
+ val expectedPartition = partitionMap.get((topic, partitionId)).get
+ assertEquals(0, partition.errorCode())
+ assertEquals(expectedPartition.leaderEpoch(), partition.leaderEpoch())
+ assertEquals(expectedPartition.partitionId(), partition.partitionIndex())
+ assertEquals(expectedPartition.eligibleLeaderReplicas(), partition.eligibleLeaderReplicas())
+ assertEquals(expectedPartition.isr(), partition.isrNodes())
+ assertEquals(expectedPartition.lastKnownElr(), partition.lastKnownElr())
+ assertEquals(expectedPartition.leader(), partition.leaderId())
+ })
+ }
+
+ // Basic test
+ var result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 10, false).topics().asScala.toList
+ assertEquals(2, result.size)
+ var resultTopic = result(0)
+ assertEquals(topic0, resultTopic.name())
+ assertEquals(0, resultTopic.errorCode())
+ assertEquals(topicIds.get(topic0), resultTopic.topicId())
+ assertEquals(3, resultTopic.partitions().size())
+ checkTopicMetadata(topic0, Set(0, 1, 2), resultTopic.partitions().asScala)
+
+ resultTopic = result(1)
+ assertEquals(topic1, resultTopic.name())
+ assertEquals(0, resultTopic.errorCode())
+ assertEquals(topicIds.get(topic1), resultTopic.topicId())
+ assertEquals(1, resultTopic.partitions().size())
+ checkTopicMetadata(topic1, Set(0), resultTopic.partitions().asScala)
+
+ // Quota reached
+ var response = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 2, false)
+ result = response.topics().asScala.toList
+ assertEquals(1, result.size)
+ resultTopic = result(0)
+ assertEquals(topic0, resultTopic.name())
+ assertEquals(0, resultTopic.errorCode())
+ assertEquals(topicIds.get(topic0), resultTopic.topicId())
+ assertEquals(2, resultTopic.partitions().size())
+ checkTopicMetadata(topic0, Set(0, 1), resultTopic.partitions().asScala)
+ assertEquals(topic0, response.nextCursor().topicName())
+ assertEquals(2, response.nextCursor().partitionIndex())
+
+ // With start index
+ result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0).iterator, listenerName, t => if (t.equals(topic0)) 1 else 0, 10, false).topics().asScala.toList
+ assertEquals(1, result.size)
+ resultTopic = result(0)
+ assertEquals(topic0, resultTopic.name())
+ assertEquals(0, resultTopic.errorCode())
+ assertEquals(topicIds.get(topic0), resultTopic.topicId())
+ assertEquals(2, resultTopic.partitions().size())
+ checkTopicMetadata(topic0, Set(1, 2), resultTopic.partitions().asScala)
+
+ // With start index and quota reached
+ response = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false)
+ result = response.topics().asScala.toList
+ assertEquals(1, result.size)
+
+ resultTopic = result(0)
+ assertEquals(topic0, resultTopic.name())
+ assertEquals(0, resultTopic.errorCode())
+ assertEquals(topicIds.get(topic0), resultTopic.topicId())
+ assertEquals(1, resultTopic.partitions().size())
+ checkTopicMetadata(topic0, Set(2), resultTopic.partitions().asScala)
+ assertEquals(topic1, response.nextCursor().topicName())
+ assertEquals(0, response.nextCursor().partitionIndex())
+
+ // When the first topic does not exist
+ result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq("Non-exist", topic0).iterator, listenerName, t => if (t.equals("Non-exist")) 1 else 0, 1, false).topics().asScala.toList
+ assertEquals(2, result.size)
+ resultTopic = result(0)
+ assertEquals("Non-exist", resultTopic.name())
+ assertEquals(3, resultTopic.errorCode())
+
+ resultTopic = result(1)
+ assertEquals(topic0, resultTopic.name())
+ assertEquals(0, resultTopic.errorCode())
+ assertEquals(topicIds.get(topic0), resultTopic.topicId())
+ assertEquals(1, resultTopic.partitions().size())
+ checkTopicMetadata(topic0, Set(0), resultTopic.partitions().asScala)
+ }
+
@ParameterizedTest
@MethodSource(Array("cacheProvider"))
def testGetPartitionInfo(cache: MetadataCache): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index bf9dd7b..d765dda 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -712,6 +712,9 @@
case ApiKeys.LIST_CLIENT_METRICS_RESOURCES =>
new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData())
+ case ApiKeys.DESCRIBE_TOPIC_PARTITIONS =>
+ new DescribeTopicPartitionsRequest.Builder(new DescribeTopicPartitionsRequestData())
+
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index adac0fb..c2a0bbc 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -52,7 +52,7 @@
public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
if (record.isr() != null) return false;
if (record.eligibleLeaderReplicas() != null) return false;
- if (record.lastKnownELR() != null) return false;
+ if (record.lastKnownElr() != null) return false;
if (record.leader() != NO_LEADER_CHANGE) return false;
if (record.replicas() != null) return false;
if (record.removingReplicas() != null) return false;
@@ -492,7 +492,7 @@
if (record.isr() != null && record.isr().isEmpty() && (partition.lastKnownElr.length != 1 ||
partition.lastKnownElr[0] != partition.leader)) {
// Only update the last known leader when the first time the partition becomes leaderless.
- record.setLastKnownELR(Arrays.asList(partition.leader));
+ record.setLastKnownElr(Arrays.asList(partition.leader));
}
}
@@ -518,14 +518,14 @@
}
if (!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
- record.setLastKnownELR(targetLastKnownElr);
+ record.setLastKnownElr(targetLastKnownElr);
}
}
private void maybePopulateTargetElr() {
if (!eligibleLeaderReplicasEnabled) return;
- // If the ISR is larger or equal to the min ISR, clear the ELR and lastKnownELR.
+ // If the ISR is larger or equal to the min ISR, clear the ELR and LastKnownElr.
if (targetIsr.size() >= minISR) {
targetElr = Collections.emptyList();
targetLastKnownElr = Collections.emptyList();
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 97b9a60..72476cf 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -200,7 +200,7 @@
record.leaderEpoch(),
record.partitionEpoch(),
Replicas.toArray(record.eligibleLeaderReplicas()),
- Replicas.toArray(record.lastKnownELR()));
+ Replicas.toArray(record.lastKnownElr()));
}
private PartitionRegistration(int[] replicas, Uuid[] directories, int[] isr, int[] removingReplicas,
@@ -255,7 +255,7 @@
LeaderRecoveryState newLeaderRecoveryState = leaderRecoveryState.changeTo(record.leaderRecoveryState());
int[] newElr = (record.eligibleLeaderReplicas() == null) ? elr : Replicas.toArray(record.eligibleLeaderReplicas());
- int[] newLastKnownElr = (record.lastKnownELR() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownELR());
+ int[] newLastKnownElr = (record.lastKnownElr() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownElr());
return new PartitionRegistration(newReplicas,
defaultToMigrating(newDirectories, replicas.length),
newIsr,
@@ -381,7 +381,7 @@
// The following are tagged fields, we should only set them when there are some contents, in order to save
// spaces.
if (elr.length > 0) record.setEligibleLeaderReplicas(Replicas.toList(elr));
- if (lastKnownElr.length > 0) record.setLastKnownELR(Replicas.toList(lastKnownElr));
+ if (lastKnownElr.length > 0) record.setLastKnownElr(Replicas.toList(lastKnownElr));
}
if (options.metadataVersion().isDirectoryAssignmentSupported()) {
record.setDirectories(Uuid.toList(directories));
diff --git a/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json b/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
index f226699..b6a3c22 100644
--- a/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
@@ -18,7 +18,7 @@
"type": "metadata",
"name": "PartitionChangeRecord",
// Version 1 adds Directories for KIP-858.
- // Version 2 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966.
+ // Version 2 implements Eligible Leader Replicas and LastKnownElr as described in KIP-966.
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
@@ -49,8 +49,8 @@
{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 6,
"about": "null if the ELR didn't change; the new eligible leader replicas otherwise." },
- { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
+ { "name": "LastKnownElr", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 7,
- "about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." }
+ "about": "null if the LastKnownElr didn't change; the last known eligible leader replicas otherwise." }
]
}
diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json b/metadata/src/main/resources/common/metadata/PartitionRecord.json
index 5c84a2e..c554561 100644
--- a/metadata/src/main/resources/common/metadata/PartitionRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json
@@ -18,7 +18,7 @@
"type": "metadata",
"name": "PartitionRecord",
// Version 1 adds Directories for KIP-858
- // Version 2 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966.
+ // Version 2 implements Eligible Leader Replicas and LastKnownElr as described in KIP-966.
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
@@ -47,7 +47,7 @@
{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 1,
"about": "The eligible leader replicas of this partition." },
- { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
+ { "name": "LastKnownElr", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 2,
"about": "The last known eligible leader replicas of this partition." }
]
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index cb00a38..efc9bd2 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -86,7 +86,7 @@
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setEligibleLeaderReplicas(Arrays.asList(5))));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
- setLastKnownELR(Arrays.asList(6))));
+ setLastKnownElr(Arrays.asList(6))));
assertFalse(
changeRecordIsNoOp(
new PartitionChangeRecord()
@@ -575,7 +575,7 @@
if (version >= 2) {
// The test partition has ELR, so unclean election will clear these fiedls.
record.setEligibleLeaderReplicas(Collections.emptyList())
- .setLastKnownELR(Collections.emptyList());
+ .setLastKnownElr(Collections.emptyList());
}
expectedRecord = new ApiMessageAndVersion(record, version);
@@ -890,7 +890,7 @@
// Both versions will set the elr and lastKnownElr as empty list.
record.setEligibleLeaderReplicas(Collections.emptyList())
- .setLastKnownELR(Collections.emptyList());
+ .setLastKnownElr(Collections.emptyList());
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
@@ -935,9 +935,9 @@
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
if (version < 2) {
record.setEligibleLeaderReplicas(Collections.emptyList());
- record.setLastKnownELR(Collections.emptyList());
+ record.setLastKnownElr(Collections.emptyList());
}
- // No change is expected to ELR/LastKnownELR.
+ // No change is expected to ELR/LastKnownElr.
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
@@ -987,7 +987,7 @@
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
if (version >= 2) {
record.setEligibleLeaderReplicas(Arrays.asList(2))
- .setLastKnownELR(Arrays.asList(3));
+ .setLastKnownElr(Arrays.asList(3));
} else {
record.setEligibleLeaderReplicas(Collections.emptyList());
}
@@ -1161,7 +1161,7 @@
.setEligibleLeaderReplicas(Arrays.asList(1, 2, 3, 4));
if (lastKnownLeaderEnabled) {
- record.setLastKnownELR(Arrays.asList(1));
+ record.setLastKnownElr(Arrays.asList(1));
}
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
@@ -1178,7 +1178,7 @@
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
PartitionChangeRecord changeRecord = (PartitionChangeRecord) builder.build().get().message();
- assertTrue(changeRecord.lastKnownELR() == null, changeRecord.toString());
+ assertTrue(changeRecord.lastKnownElr() == null, changeRecord.toString());
} else {
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
}
@@ -1217,7 +1217,7 @@
.setIsr(Arrays.asList(1))
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value())
- .setLastKnownELR(Collections.emptyList()),
+ .setLastKnownElr(Collections.emptyList()),
version
);
assertEquals(Optional.of(expectedRecord), builder.build());
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index 37c3cd1..9de6238 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -320,7 +320,7 @@
if (metadataVersion.isElrSupported()) {
expectRecord.
setEligibleLeaderReplicas(Arrays.asList(2, 3)).
- setLastKnownELR(Arrays.asList(4));
+ setLastKnownElr(Arrays.asList(4));
}
if (metadataVersion.isDirectoryAssignmentSupported()) {
expectRecord.setDirectories(Arrays.asList(
diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
index f6682ac..121f89d 100644
--- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java
+++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
@@ -193,6 +193,9 @@
public static final int MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS = 1000;
public static final int FETCH_MAX_BYTES = 55 * 1024 * 1024;
+ /** ********* Request Limit Configuration ***********/
+ public static final int MAX_REQUEST_PARTITION_SIZE_LIMIT = 2000;
+
/** ********* Quota Configuration *********/
public static final int NUM_QUOTA_SAMPLES = ClientQuotaManagerConfig.DEFAULT_NUM_QUOTA_SAMPLES;
public static final int QUOTA_WINDOW_SIZE_SECONDS = ClientQuotaManagerConfig.DEFAULT_QUOTA_WINDOW_SIZE_SECONDS;