KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator (#14589)
In `KafkaApis.scala`, we build the API response differently if exceptions are thrown during the API execution. Since the new group coordinator only populates the response with error code instead of throwing an exception when an error occurs, there may be different behavior between the existing group coordinator and the new one.
This patch:
- Fixes the response building in `KafkaApis.scala` for the two APIs affected by such difference -- OffsetFetch and OffsetDelete.
- In `GroupCoordinatorService.java`, returns a response with error code instead of a failed future when the coordinator is not active.
Reviewers: David Jacot <djacot@confluent.io>
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
index 5da11fd..e8c2196 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
@@ -96,7 +96,10 @@
public Builder merge(
OffsetDeleteResponseData newData
) {
- if (data.topics().isEmpty()) {
+ if (newData.errorCode() != Errors.NONE.code()) {
+ // If the top-level error exists, we can discard it and use the new data.
+ data = newData;
+ } else if (data.topics().isEmpty()) {
// If the current data is empty, we can discard it and use the new data.
data = newData;
} else {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 8c3ec12..e70fe11 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -189,6 +189,25 @@
return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e);
}
+ public static TxnOffsetCommitResponseData getErrorResponse(
+ TxnOffsetCommitRequestData request,
+ Errors error
+ ) {
+ TxnOffsetCommitResponseData response = new TxnOffsetCommitResponseData();
+ request.topics().forEach(topic -> {
+ TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic responseTopic = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setName(topic.name());
+ response.topics().add(responseTopic);
+
+ topic.partitions().forEach(partition -> {
+ responseTopic.partitions().add(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(error.code()));
+ });
+ });
+ return response;
+ }
+
public static TxnOffsetCommitRequest parse(ByteBuffer buffer, short version) {
return new TxnOffsetCommitRequest(new TxnOffsetCommitRequestData(
new ByteBufferAccessor(buffer), version), version);
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
index d49bdce..6ebe7b0 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
@@ -19,6 +19,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
@@ -32,6 +33,7 @@
import java.util.Map;
import java.util.Optional;
+import static org.apache.kafka.common.requests.TxnOffsetCommitRequest.getErrorResponse;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TxnOffsetCommitRequestTest extends OffsetCommitRequestTest {
@@ -127,4 +129,25 @@
assertEquals(throttleTimeMs, response.throttleTimeMs());
}
}
+
+ @Test
+ @Override
+ public void testGetErrorResponse() {
+ TxnOffsetCommitResponseData expectedResponse = new TxnOffsetCommitResponseData()
+ .setTopics(Arrays.asList(
+ new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setName(topicOne)
+ .setPartitions(Collections.singletonList(
+ new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+ .setPartitionIndex(partitionOne))),
+ new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setName(topicTwo)
+ .setPartitions(Collections.singletonList(
+ new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+ .setPartitionIndex(partitionTwo)))));
+
+ assertEquals(expectedResponse, getErrorResponse(builderWithGroupMetadata.data, Errors.UNKNOWN_MEMBER_ID));
+ }
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 030c056..c29d0b3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1461,6 +1461,8 @@
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(offsetFetchRequest.groupId)
.setErrorCode(Errors.forException(exception).code)
+ } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) {
+ offsetFetchResponse
} else {
// Clients are not allowed to see offsets for topics that are not authorized for Describe.
val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized(
@@ -1500,6 +1502,8 @@
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(offsetFetchRequest.groupId)
.setErrorCode(Errors.forException(exception).code)
+ } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) {
+ offsetFetchResponse
} else {
val topics = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics](
offsetFetchResponse.topics.size + unauthorizedTopics.size
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 29ad468..dd6bd16 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3267,6 +3267,42 @@
assertEquals(Errors.GROUP_ID_NOT_FOUND, Errors.forCode(response.data.errorCode))
}
+ @Test
+ def testOffsetDeleteWithInvalidGroupWithTopLevelError(): Unit = {
+ val group = "groupId"
+ val topic = "topic"
+ addTopicToMetadataCache(topic, numPartitions = 1)
+
+ val offsetDeleteRequest = new OffsetDeleteRequest.Builder(
+ new OffsetDeleteRequestData()
+ .setGroupId(group)
+ .setTopics(new OffsetDeleteRequestTopicCollection(Collections.singletonList(new OffsetDeleteRequestTopic()
+ .setName("topic-unknown")
+ .setPartitions(Collections.singletonList(new OffsetDeleteRequestPartition()
+ .setPartitionIndex(0)
+ ))
+ ).iterator()))
+ ).build()
+ val request = buildRequest(offsetDeleteRequest)
+
+ val future = new CompletableFuture[OffsetDeleteResponseData]()
+ when(groupCoordinator.deleteOffsets(
+ request.context,
+ new OffsetDeleteRequestData().setGroupId(group), // Nonexistent topics won't be passed to groupCoordinator.
+ RequestLocal.NoCaching.bufferSupplier
+ )).thenReturn(future)
+
+ createKafkaApis().handleOffsetDeleteRequest(request, RequestLocal.NoCaching)
+
+ future.complete(new OffsetDeleteResponseData()
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ )
+
+ val response = verifyNoThrottling[OffsetDeleteResponse](request)
+
+ assertEquals(Errors.GROUP_ID_NOT_FOUND, Errors.forCode(response.data.errorCode))
+ }
+
private def testListOffsetFailedGetLeaderReplica(error: Errors): Unit = {
val tp = new TopicPartition("foo", 0)
val isolationLevel = IsolationLevel.READ_UNCOMMITTED
@@ -4301,6 +4337,7 @@
).asJava,
"group-2" -> null,
"group-3" -> null,
+ "group-4" -> null,
).asJava
buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version))
}
@@ -4341,6 +4378,15 @@
false
)).thenReturn(group3Future)
+ val group4Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
+ when(groupCoordinator.fetchAllOffsets(
+ requestChannelRequest.context,
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-4")
+ .setTopics(null),
+ false
+ )).thenReturn(group4Future)
+
createKafkaApis().handleOffsetFetchRequest(requestChannelRequest)
val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
@@ -4385,15 +4431,19 @@
.setGroupId("group-3")
.setErrorCode(Errors.INVALID_GROUP_ID.code)
- val expectedOffsetFetchResponse = new OffsetFetchResponseData()
- .setGroups(List(group1Response, group2Response, group3Response).asJava)
+ val group4Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-4")
+ .setErrorCode(Errors.INVALID_GROUP_ID.code)
+
+ val expectedGroups = List(group1Response, group2Response, group3Response, group4Response)
group1Future.complete(group1Response)
group2Future.complete(group2Response)
group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception)
+ group4Future.complete(group4Response)
val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
- assertEquals(expectedOffsetFetchResponse, response.data)
+ assertEquals(expectedGroups.toSet, response.data.groups().asScala.toSet)
}
}
@@ -4702,6 +4752,90 @@
}
@Test
+ def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = {
+ def makeRequest(version: Short): RequestChannel.Request = {
+ val groups = Map(
+ "group-1" -> List(
+ new TopicPartition("foo", 0),
+ new TopicPartition("bar", 0)
+ ).asJava,
+ "group-2" -> List(
+ new TopicPartition("foo", 0),
+ new TopicPartition("bar", 0)
+ ).asJava
+ ).asJava
+ buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version))
+ }
+
+ val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+
+ val acls = Map(
+ "group-1" -> AuthorizationResult.ALLOWED,
+ "group-2" -> AuthorizationResult.ALLOWED,
+ "foo" -> AuthorizationResult.DENIED,
+ "bar" -> AuthorizationResult.ALLOWED
+ )
+
+ when(authorizer.authorize(
+ any[RequestContext],
+ any[util.List[Action]]
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument(1, classOf[util.List[Action]])
+ actions.asScala.map { action =>
+ acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+ }.asJava
+ }
+
+ // group-1 and group-2 are allowed and bar is allowed.
+ val group1Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
+ when(groupCoordinator.fetchOffsets(
+ requestChannelRequest.context,
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-1")
+ .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(List[Integer](0).asJava)).asJava),
+ false
+ )).thenReturn(group1Future)
+
+ val group2Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
+ when(groupCoordinator.fetchOffsets(
+ requestChannelRequest.context,
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-2")
+ .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(List[Integer](0).asJava)).asJava),
+ false
+ )).thenReturn(group1Future)
+
+ createKafkaApis(authorizer = Some(authorizer)).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+ // group-2 mocks using the new group coordinator.
+ // When the coordinator is not active, a response with top-level error code is returned
+ // despite that the requested topic is not authorized and fails.
+ val group2ResponseFromCoordinator = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-2")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
+
+ val expectedOffsetFetchResponse = new OffsetFetchResponseData()
+ .setGroups(List(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-1")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code),
+ group2ResponseFromCoordinator
+ ).asJava)
+
+ group1Future.completeExceptionally(Errors.COORDINATOR_NOT_AVAILABLE.exception)
+ group2Future.complete(group2ResponseFromCoordinator)
+
+ val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+ assertEquals(expectedOffsetFetchResponse, response.data)
+ }
+
+ @Test
def testReassignmentAndReplicationBytesOutRateWhenReassigning(): Unit = {
assertReassignmentAndReplicationBytesOutPerSec(true)
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 0476413..291afdc 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -62,7 +62,8 @@
* @param context The request context.
* @param request The ConsumerGroupHeartbeatResponse data.
*
- * @return A future yielding the response or an exception.
+ * @return A future yielding the response.
+ * The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
RequestContext context,
@@ -76,7 +77,8 @@
* @param request The JoinGroupRequest data.
* @param bufferSupplier The buffer supplier tight to the request thread.
*
- * @return A future yielding the response or an exception.
+ * @return A future yielding the response.
+ * The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<JoinGroupResponseData> joinGroup(
RequestContext context,
@@ -91,7 +93,8 @@
* @param request The SyncGroupRequest data.
* @param bufferSupplier The buffer supplier tight to the request thread.
*
- * @return A future yielding the response or an exception.
+ * @return A future yielding the response.
+ * The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<SyncGroupResponseData> syncGroup(
RequestContext context,
@@ -105,7 +108,8 @@
* @param context The coordinator request context.
* @param request The HeartbeatRequest data.
*
- * @return A future yielding the response or an exception.
+ * @return A future yielding the response.
+ * The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<HeartbeatResponseData> heartbeat(
RequestContext context,
@@ -118,7 +122,8 @@
* @param context The coordinator request context.
* @param request The LeaveGroupRequest data.
*
- * @return A future yielding the response or an exception.
+ * @return A future yielding the response.
+ * The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<LeaveGroupResponseData> leaveGroup(
RequestContext context,
@@ -131,7 +136,8 @@
* @param context The coordinator request context.
* @param request The ListGroupRequest data.
*
- * @return A future yielding the response or an exception.
+ * @return A future yielding the response.
+ * The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<ListGroupsResponseData> listGroups(
RequestContext context,
@@ -144,7 +150,8 @@
* @param context The coordinator request context.
* @param groupIds The group ids.
*
- * @return A future yielding the results or an exception.
+ * @return A future yielding the results.
+ * The error codes of the results are set to indicate the errors occurred during the execution.
*/
CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> describeGroups(
RequestContext context,
@@ -158,7 +165,8 @@
* @param groupIds The group ids.
* @param bufferSupplier The buffer supplier tight to the request thread.
*
- * @return A future yielding the results or an exception.
+ * @return A future yielding the results.
+ * The error codes of the results are set to indicate the errors occurred during the execution.
*/
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> deleteGroups(
RequestContext context,
@@ -172,7 +180,8 @@
* @param context The request context.
* @param request The OffsetFetchRequestGroup request.
*
- * @return A future yielding the results or an exception.
+ * @return A future yielding the results.
+ * The error codes of the results are set to indicate the errors occurred during the execution.
*/
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchOffsets(
RequestContext context,
@@ -186,7 +195,8 @@
* @param context The request context.
* @param request The OffsetFetchRequestGroup request.
*
- * @return A future yielding the results or an exception.
+ * @return A future yielding the results.
+ * The error codes of the results are set to indicate the errors occurred during the execution.
*/
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffsets(
RequestContext context,
@@ -201,7 +211,8 @@
* @param request The OffsetCommitRequest data.
* @param bufferSupplier The buffer supplier tight to the request thread.
*
- * @return A future yielding the response or an exception.
+ * @return A future yielding the response.
+ * The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<OffsetCommitResponseData> commitOffsets(
RequestContext context,
@@ -216,7 +227,8 @@
* @param request The TnxOffsetCommitRequest data.
* @param bufferSupplier The buffer supplier tight to the request thread.
*
- * @return A future yielding the response or an exception.
+ * @return A future yielding the response.
+ * The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<TxnOffsetCommitResponseData> commitTransactionalOffsets(
RequestContext context,
@@ -231,7 +243,8 @@
* @param request The OffsetDeleteRequest data.
* @param bufferSupplier The buffer supplier tight to the request thread.
*
- * @return A future yielding the response or an exception.
+ * @return A future yielding the response.
+ * The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<OffsetDeleteResponseData> deleteOffsets(
RequestContext context,
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 176dc77..b092316 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -58,6 +58,7 @@
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -268,7 +269,9 @@
ConsumerGroupHeartbeatRequestData request
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ );
}
return runtime.scheduleWriteOperation(
@@ -311,19 +314,21 @@
BufferSupplier bufferSupplier
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(new JoinGroupResponseData()
+ .setMemberId(request.memberId())
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ );
+ }
+
+ if (!isGroupIdNotEmpty(request.groupId())) {
+ return CompletableFuture.completedFuture(new JoinGroupResponseData()
+ .setMemberId(request.memberId())
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ );
}
CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>();
- if (!isGroupIdNotEmpty(request.groupId())) {
- responseFuture.complete(new JoinGroupResponseData()
- .setMemberId(request.memberId())
- .setErrorCode(Errors.INVALID_GROUP_ID.code()));
-
- return responseFuture;
- }
-
runtime.scheduleWriteOperation(
"generic-group-join",
topicPartitionFor(request.groupId()),
@@ -354,12 +359,15 @@
BufferSupplier bufferSupplier
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(new SyncGroupResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ );
}
if (!isGroupIdNotEmpty(request.groupId())) {
return CompletableFuture.completedFuture(new SyncGroupResponseData()
- .setErrorCode(Errors.INVALID_GROUP_ID.code()));
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ );
}
CompletableFuture<SyncGroupResponseData> responseFuture = new CompletableFuture<>();
@@ -393,12 +401,15 @@
HeartbeatRequestData request
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(new HeartbeatResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ );
}
if (!isGroupIdNotEmpty(request.groupId())) {
return CompletableFuture.completedFuture(new HeartbeatResponseData()
- .setErrorCode(Errors.INVALID_GROUP_ID.code()));
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ );
}
// Using a read operation is okay here as we ignore the last committed offset in the snapshot registry.
@@ -432,12 +443,15 @@
LeaveGroupRequestData request
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(new LeaveGroupResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ );
}
if (!isGroupIdNotEmpty(request.groupId())) {
return CompletableFuture.completedFuture(new LeaveGroupResponseData()
- .setErrorCode(Errors.INVALID_GROUP_ID.code()));
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ );
}
return runtime.scheduleWriteOperation(
@@ -477,7 +491,9 @@
ListGroupsRequestData request
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(new ListGroupsResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ );
}
final CompletableFuture<ListGroupsResponseData> future = new CompletableFuture<>();
@@ -518,7 +534,10 @@
List<String> groupIds
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(DescribeGroupsRequest.getErrorDescribedGroupList(
+ groupIds,
+ Errors.COORDINATOR_NOT_AVAILABLE
+ ));
}
final List<CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>> futures =
@@ -579,7 +598,10 @@
BufferSupplier bufferSupplier
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
+ groupIds,
+ Errors.COORDINATOR_NOT_AVAILABLE
+ ));
}
final List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>> futures =
@@ -638,12 +660,18 @@
boolean requireStable
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(request.groupId())
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ );
}
// For backwards compatibility, we support fetch commits for the empty group id.
if (request.groupId() == null) {
- return FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
+ return CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(request.groupId())
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ );
}
// The require stable flag when set tells the broker to hold on returning unstable
@@ -682,12 +710,18 @@
boolean requireStable
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(request.groupId())
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ );
}
// For backwards compatibility, we support fetch commits for the empty group id.
if (request.groupId() == null) {
- return FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
+ return CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(request.groupId())
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ );
}
// The require stable flag when set tells the broker to hold on returning unstable
@@ -726,7 +760,10 @@
BufferSupplier bufferSupplier
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse(
+ request,
+ Errors.COORDINATOR_NOT_AVAILABLE
+ ));
}
// For backwards compatibility, we support offset commits for the empty groupId.
@@ -756,7 +793,10 @@
BufferSupplier bufferSupplier
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(TxnOffsetCommitRequest.getErrorResponse(
+ request,
+ Errors.COORDINATOR_NOT_AVAILABLE
+ ));
}
return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
@@ -774,7 +814,9 @@
BufferSupplier bufferSupplier
) {
if (!isActive.get()) {
- return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ return CompletableFuture.completedFuture(new OffsetDeleteResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ );
}
if (!isGroupIdNotEmpty(request.groupId())) {
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 9a18db8..e3e13b8 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -87,7 +87,6 @@
import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
-import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -139,7 +138,7 @@
}
@Test
- public void testConsumerGroupHeartbeatWhenNotStarted() {
+ public void testConsumerGroupHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
@@ -150,12 +149,15 @@
ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData()
.setGroupId("foo");
- assertFutureThrows(
- service.consumerGroupHeartbeat(
- requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
- request
- ),
- CoordinatorNotAvailableException.class
+ CompletableFuture<ConsumerGroupHeartbeatResponseData> future = service.consumerGroupHeartbeat(
+ requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+ request
+ );
+
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ future.get()
);
}
@@ -426,6 +428,31 @@
}
@Test
+ public void testJoinGroupWhenNotStarted() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ JoinGroupRequestData request = new JoinGroupRequestData()
+ .setGroupId("foo");
+
+ CompletableFuture<JoinGroupResponseData> future = service.joinGroup(
+ requestContext(ApiKeys.JOIN_GROUP),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertEquals(
+ new JoinGroupResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ future.get()
+ );
+ }
+
+ @Test
public void testSyncGroup() {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
@@ -519,6 +546,31 @@
}
@Test
+ public void testSyncGroupWhenNotStarted() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ SyncGroupRequestData request = new SyncGroupRequestData()
+ .setGroupId("foo");
+
+ CompletableFuture<SyncGroupResponseData> future = service.syncGroup(
+ requestContext(ApiKeys.SYNC_GROUP),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertEquals(
+ new SyncGroupResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ future.get()
+ );
+ }
+
+ @Test
public void testHeartbeat() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
@@ -615,6 +667,30 @@
}
@Test
+ public void testHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ HeartbeatRequestData request = new HeartbeatRequestData()
+ .setGroupId("foo");
+
+ CompletableFuture<HeartbeatResponseData> future = service.heartbeat(
+ requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+ request
+ );
+
+ assertEquals(
+ new HeartbeatResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ future.get()
+ );
+ }
+
+ @Test
public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
@@ -757,6 +833,29 @@
}
@Test
+ public void testListGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ ListGroupsRequestData request = new ListGroupsRequestData();
+
+ CompletableFuture<ListGroupsResponseData> future = service.listGroups(
+ requestContext(ApiKeys.LIST_GROUPS),
+ request
+ );
+
+ assertEquals(
+ new ListGroupsResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ future.get()
+ );
+ }
+
+ @Test
public void testDescribeGroups() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
@@ -860,6 +959,29 @@
);
}
+ @Test
+ public void testDescribeGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> future = service.describeGroups(
+ requestContext(ApiKeys.DESCRIBE_GROUPS),
+ Collections.singletonList("group-id")
+ );
+
+ assertEquals(
+ Collections.singletonList(new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ ),
+ future.get()
+ );
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFetchOffsets(
@@ -915,6 +1037,39 @@
@ParameterizedTest
@ValueSource(booleans = {true, false})
+ public void testFetchOffsetsWhenNotStarted(
+ boolean requireStable
+ ) throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ OffsetFetchRequestData.OffsetFetchRequestGroup request =
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group")
+ .setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(Collections.singletonList(0))));
+
+ CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchOffsets(
+ requestContext(ApiKeys.OFFSET_FETCH),
+ request,
+ requireStable
+ );
+
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ future.get()
+ );
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
public void testFetchAllOffsets(
boolean requireStable
) throws ExecutionException, InterruptedException, TimeoutException {
@@ -963,6 +1118,36 @@
assertEquals(response, future.get(5, TimeUnit.SECONDS));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testFetchAllOffsetsWhenNotStarted(
+ boolean requireStable
+ ) throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ OffsetFetchRequestData.OffsetFetchRequestGroup request =
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group");
+
+ CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchAllOffsets(
+ requestContext(ApiKeys.OFFSET_FETCH),
+ request,
+ requireStable
+ );
+
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ future.get()
+ );
+ }
+
@Test
public void testLeaveGroup() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
@@ -1047,6 +1232,30 @@
}
@Test
+ public void testLeaveGroupWhenNotStarted() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ LeaveGroupRequestData request = new LeaveGroupRequestData()
+ .setGroupId("foo");
+
+ CompletableFuture<LeaveGroupResponseData> future = service.leaveGroup(
+ requestContext(ApiKeys.LEAVE_GROUP),
+ request
+ );
+
+ assertEquals(
+ new LeaveGroupResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ future.get()
+ );
+ }
+
+ @Test
public void testDeleteOffsets() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
@@ -1181,6 +1390,31 @@
}
@Test
+ public void testDeleteOffsetsWhenNotStarted() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+ .setGroupId("foo");
+
+ CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertEquals(
+ new OffsetDeleteResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ future.get()
+ );
+ }
+
+ @Test
public void testDeleteGroups() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
@@ -1287,4 +1521,30 @@
future.get()
);
}
+
+ @Test
+ public void testDeleteGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = service.deleteGroups(
+ requestContext(ApiKeys.DELETE_GROUPS),
+ Collections.singletonList("foo"),
+ BufferSupplier.NO_CACHING
+ );
+
+ assertEquals(
+ new DeleteGroupsResponseData.DeletableGroupResultCollection(
+ Collections.singletonList(new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("foo")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ ).iterator()
+ ),
+ future.get()
+ );
+ }
}