| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package kafka.coordinator.group |
| |
| import kafka.common.OffsetAndMetadata |
| import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} |
| import kafka.server.RequestLocal |
| import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} |
| import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException} |
| import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} |
| import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol |
| import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember |
| import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection} |
| import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection} |
| import org.apache.kafka.common.network.{ClientInformation, ListenerName} |
| import org.apache.kafka.common.protocol.{ApiKeys, Errors} |
| import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, RequestHeader} |
| import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} |
| import org.apache.kafka.common.utils.{BufferSupplier, Time} |
| import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource |
| import org.apache.kafka.server.util.MockTime |
| import org.apache.kafka.test.TestUtils.assertFutureThrows |
| import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} |
| import org.junit.jupiter.api.Test |
| import org.junit.jupiter.params.ParameterizedTest |
| import org.mockito.{ArgumentCaptor, ArgumentMatchers} |
| import org.mockito.Mockito.{mock, verify, when} |
| |
| import java.net.InetAddress |
| import java.util.Optional |
| import scala.jdk.CollectionConverters._ |
| |
| class GroupCoordinatorAdapterTest { |
| |
| private def makeContext( |
| apiKey: ApiKeys, |
| apiVersion: Short |
| ): RequestContext = { |
| new RequestContext( |
| new RequestHeader(apiKey, apiVersion, "client", 0), |
| "1", |
| InetAddress.getLocalHost, |
| KafkaPrincipal.ANONYMOUS, |
| ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), |
| SecurityProtocol.PLAINTEXT, |
| ClientInformation.EMPTY, |
| false |
| ) |
| } |
| |
| @Test |
| def testJoinConsumerGroup(): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| val ctx = makeContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion) |
| val request = new ConsumerGroupHeartbeatRequestData() |
| .setGroupId("group") |
| |
| val future = adapter.consumerGroupHeartbeat(ctx, request) |
| |
| assertTrue(future.isDone) |
| assertTrue(future.isCompletedExceptionally) |
| assertFutureThrows(future, classOf[UnsupportedVersionException]) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) |
| def testJoinGroup(version: Short): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| val ctx = makeContext(ApiKeys.JOIN_GROUP, version) |
| val request = new JoinGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setProtocolType("consumer") |
| .setRebalanceTimeoutMs(1000) |
| .setSessionTimeoutMs(2000) |
| .setReason("reason") |
| .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(List( |
| new JoinGroupRequestProtocol() |
| .setName("first") |
| .setMetadata("first".getBytes()), |
| new JoinGroupRequestProtocol() |
| .setName("second") |
| .setMetadata("second".getBytes())).iterator.asJava)) |
| val bufferSupplier = BufferSupplier.create() |
| |
| val future = adapter.joinGroup(ctx, request, bufferSupplier) |
| assertFalse(future.isDone) |
| |
| val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = |
| ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]]) |
| val capturedCallback: ArgumentCaptor[JoinGroupCallback] = |
| ArgumentCaptor.forClass(classOf[JoinGroupCallback]) |
| |
| verify(groupCoordinator).handleJoinGroup( |
| ArgumentMatchers.eq(request.groupId), |
| ArgumentMatchers.eq(request.memberId), |
| ArgumentMatchers.eq(None), |
| ArgumentMatchers.eq(if (version >= 4) true else false), |
| ArgumentMatchers.eq(if (version >= 9) true else false), |
| ArgumentMatchers.eq(ctx.clientId), |
| ArgumentMatchers.eq(InetAddress.getLocalHost.toString), |
| ArgumentMatchers.eq(request.rebalanceTimeoutMs), |
| ArgumentMatchers.eq(request.sessionTimeoutMs), |
| ArgumentMatchers.eq(request.protocolType), |
| capturedProtocols.capture(), |
| capturedCallback.capture(), |
| ArgumentMatchers.eq(Some("reason")), |
| ArgumentMatchers.eq(RequestLocal(bufferSupplier)) |
| ) |
| |
| assertEquals(List( |
| ("first", "first"), |
| ("second", "second") |
| ), capturedProtocols.getValue.map { case (name, metadata) => |
| (name, new String(metadata)) |
| }) |
| |
| capturedCallback.getValue.apply(JoinGroupResult( |
| members = List( |
| new JoinGroupResponseMember() |
| .setMemberId("member") |
| .setMetadata("member".getBytes()) |
| .setGroupInstanceId("instance") |
| ), |
| memberId = "member", |
| generationId = 10, |
| protocolType = Some("consumer"), |
| protocolName = Some("range"), |
| leaderId = "leader", |
| skipAssignment = true, |
| error = Errors.UNKNOWN_MEMBER_ID |
| )) |
| |
| val expectedData = new JoinGroupResponseData() |
| .setMembers(List(new JoinGroupResponseMember() |
| .setMemberId("member") |
| .setMetadata("member".getBytes()) |
| .setGroupInstanceId("instance")).asJava) |
| .setMemberId("member") |
| .setGenerationId(10) |
| .setProtocolType("consumer") |
| .setProtocolName("range") |
| .setLeader("leader") |
| .setSkipAssignment(true) |
| .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code) |
| |
| assertTrue(future.isDone) |
| assertEquals(expectedData, future.get()) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP) |
| def testSyncGroup(version: Short): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| val ctx = makeContext(ApiKeys.SYNC_GROUP, version) |
| val data = new SyncGroupRequestData() |
| .setGroupId("group") |
| .setMemberId("member1") |
| .setGroupInstanceId("instance") |
| .setProtocolType("consumer") |
| .setProtocolName("range") |
| .setGenerationId(10) |
| .setAssignments(List( |
| new SyncGroupRequestData.SyncGroupRequestAssignment() |
| .setMemberId("member1") |
| .setAssignment("member1".getBytes()), |
| new SyncGroupRequestData.SyncGroupRequestAssignment() |
| .setMemberId("member2") |
| .setAssignment("member2".getBytes()) |
| ).asJava) |
| val bufferSupplier = BufferSupplier.create() |
| |
| val future = adapter.syncGroup(ctx, data, bufferSupplier) |
| assertFalse(future.isDone) |
| |
| val capturedAssignment: ArgumentCaptor[Map[String, Array[Byte]]] = |
| ArgumentCaptor.forClass(classOf[Map[String, Array[Byte]]]) |
| val capturedCallback: ArgumentCaptor[SyncGroupCallback] = |
| ArgumentCaptor.forClass(classOf[SyncGroupCallback]) |
| |
| verify(groupCoordinator).handleSyncGroup( |
| ArgumentMatchers.eq(data.groupId), |
| ArgumentMatchers.eq(data.generationId), |
| ArgumentMatchers.eq(data.memberId), |
| ArgumentMatchers.eq(Some(data.protocolType)), |
| ArgumentMatchers.eq(Some(data.protocolName)), |
| ArgumentMatchers.eq(Some(data.groupInstanceId)), |
| capturedAssignment.capture(), |
| capturedCallback.capture(), |
| ArgumentMatchers.eq(RequestLocal(bufferSupplier)) |
| ) |
| |
| assertEquals(Map( |
| "member1" -> "member1", |
| "member2" -> "member2", |
| ), capturedAssignment.getValue.map { case (member, metadata) => |
| (member, new String(metadata)) |
| }) |
| |
| capturedCallback.getValue.apply(SyncGroupResult( |
| error = Errors.NONE, |
| protocolType = Some("consumer"), |
| protocolName = Some("range"), |
| memberAssignment = "member1".getBytes() |
| )) |
| |
| val expectedResponseData = new SyncGroupResponseData() |
| .setErrorCode(Errors.NONE.code) |
| .setProtocolType("consumer") |
| .setProtocolName("range") |
| .setAssignment("member1".getBytes()) |
| |
| assertTrue(future.isDone) |
| assertEquals(expectedResponseData, future.get()) |
| } |
| |
| @Test |
| def testHeartbeat(): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| val ctx = makeContext(ApiKeys.HEARTBEAT, ApiKeys.HEARTBEAT.latestVersion) |
| val data = new HeartbeatRequestData() |
| .setGroupId("group") |
| .setMemberId("member1") |
| .setGenerationId(0) |
| |
| val future = adapter.heartbeat(ctx, data) |
| |
| val capturedCallback: ArgumentCaptor[Errors => Unit] = |
| ArgumentCaptor.forClass(classOf[Errors => Unit]) |
| |
| verify(groupCoordinator).handleHeartbeat( |
| ArgumentMatchers.eq(data.groupId), |
| ArgumentMatchers.eq(data.memberId), |
| ArgumentMatchers.eq(None), |
| ArgumentMatchers.eq(data.generationId), |
| capturedCallback.capture(), |
| ) |
| |
| assertFalse(future.isDone) |
| |
| capturedCallback.getValue.apply(Errors.NONE) |
| |
| assertTrue(future.isDone) |
| assertEquals(new HeartbeatResponseData(), future.get()) |
| } |
| |
| def testLeaveGroup(): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| val ctx = makeContext(ApiKeys.LEAVE_GROUP, ApiKeys.LEAVE_GROUP.latestVersion) |
| val data = new LeaveGroupRequestData() |
| .setGroupId("group") |
| .setMembers(List( |
| new LeaveGroupRequestData.MemberIdentity() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1"), |
| new LeaveGroupRequestData.MemberIdentity() |
| .setMemberId("member-2") |
| .setGroupInstanceId("instance-2") |
| ).asJava) |
| |
| val future = adapter.leaveGroup(ctx, data) |
| |
| val capturedCallback: ArgumentCaptor[LeaveGroupResult => Unit] = |
| ArgumentCaptor.forClass(classOf[LeaveGroupResult => Unit]) |
| |
| verify(groupCoordinator).handleLeaveGroup( |
| ArgumentMatchers.eq(data.groupId), |
| ArgumentMatchers.eq(data.members.asScala.toList), |
| capturedCallback.capture(), |
| ) |
| |
| assertFalse(future.isDone) |
| |
| capturedCallback.getValue.apply(LeaveGroupResult( |
| topLevelError = Errors.NONE, |
| memberResponses = List( |
| LeaveMemberResponse( |
| memberId = "member-1", |
| groupInstanceId = Some("instance-1"), |
| error = Errors.NONE |
| ), |
| LeaveMemberResponse( |
| memberId = "member-2", |
| groupInstanceId = Some("instance-2"), |
| error = Errors.NONE |
| ) |
| ) |
| )) |
| |
| val expectedData = new LeaveGroupResponseData() |
| .setMembers(List( |
| new LeaveGroupResponseData.MemberResponse() |
| .setMemberId("member-1") |
| .setGroupInstanceId("instance-1"), |
| new LeaveGroupResponseData.MemberResponse() |
| .setMemberId("member-2") |
| .setGroupInstanceId("instance-2") |
| ).asJava) |
| |
| assertTrue(future.isDone) |
| assertEquals(expectedData, future.get()) |
| } |
| |
| @Test |
| def testListGroups(): Unit = { |
| testListGroups(null, Set.empty) |
| testListGroups(List(), Set.empty) |
| testListGroups(List("Stable"), Set("Stable")) |
| } |
| |
| def testListGroups( |
| statesFilter: List[String], |
| expectedStatesFilter: Set[String] |
| ): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| val ctx = makeContext(ApiKeys.LIST_GROUPS, ApiKeys.LIST_GROUPS.latestVersion) |
| val data = new ListGroupsRequestData() |
| .setStatesFilter(statesFilter.asJava) |
| |
| when(groupCoordinator.handleListGroups(expectedStatesFilter)).thenReturn { |
| (Errors.NOT_COORDINATOR, List( |
| GroupOverview("group1", "protocol1", "Stable"), |
| GroupOverview("group2", "qwerty", "Empty") |
| )) |
| } |
| |
| val future = adapter.listGroups(ctx, data) |
| assertTrue(future.isDone) |
| |
| val expectedData = new ListGroupsResponseData() |
| .setErrorCode(Errors.NOT_COORDINATOR.code) |
| .setGroups(List( |
| new ListGroupsResponseData.ListedGroup() |
| .setGroupId("group1") |
| .setGroupState("Stable") |
| .setProtocolType("protocol1"), |
| new ListGroupsResponseData.ListedGroup() |
| .setGroupId("group2") |
| .setGroupState("Empty") |
| .setProtocolType("qwerty") |
| ).asJava) |
| |
| assertTrue(future.isDone) |
| assertEquals(expectedData, future.get()) |
| } |
| |
| @Test |
| def testDescribeGroup(): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| val groupId1 = "group-1" |
| val groupId2 = "group-2" |
| |
| val groupSummary1 = GroupSummary( |
| "Stable", |
| "consumer", |
| "roundrobin", |
| List(MemberSummary( |
| "memberid", |
| Some("instanceid"), |
| "clientid", |
| "clienthost", |
| "metadata".getBytes(), |
| "assignment".getBytes() |
| )) |
| ) |
| |
| when(groupCoordinator.handleDescribeGroup(groupId1)).thenReturn { |
| (Errors.NONE, groupSummary1) |
| } |
| |
| when(groupCoordinator.handleDescribeGroup(groupId2)).thenReturn { |
| (Errors.NOT_COORDINATOR, GroupCoordinator.EmptyGroup) |
| } |
| |
| val ctx = makeContext(ApiKeys.DESCRIBE_GROUPS, ApiKeys.DESCRIBE_GROUPS.latestVersion) |
| val future = adapter.describeGroups(ctx, List(groupId1, groupId2).asJava) |
| assertTrue(future.isDone) |
| |
| val expectedDescribedGroups = List( |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId(groupId1) |
| .setErrorCode(Errors.NONE.code) |
| .setProtocolType(groupSummary1.protocolType) |
| .setProtocolData(groupSummary1.protocol) |
| .setGroupState(groupSummary1.state) |
| .setMembers(List(new DescribeGroupsResponseData.DescribedGroupMember() |
| .setMemberId(groupSummary1.members.head.memberId) |
| .setGroupInstanceId(groupSummary1.members.head.groupInstanceId.orNull) |
| .setClientId(groupSummary1.members.head.clientId) |
| .setClientHost(groupSummary1.members.head.clientHost) |
| .setMemberMetadata(groupSummary1.members.head.metadata) |
| .setMemberAssignment(groupSummary1.members.head.assignment) |
| ).asJava), |
| new DescribeGroupsResponseData.DescribedGroup() |
| .setGroupId(groupId2) |
| .setErrorCode(Errors.NOT_COORDINATOR.code) |
| ).asJava |
| |
| assertEquals(expectedDescribedGroups, future.get()) |
| } |
| |
| @Test |
| def testDeleteGroups(): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| val ctx = makeContext(ApiKeys.DELETE_GROUPS, ApiKeys.DELETE_GROUPS.latestVersion) |
| val groupIds = List("group-1", "group-2", "group-3") |
| val bufferSupplier = BufferSupplier.create() |
| |
| when(groupCoordinator.handleDeleteGroups( |
| groupIds.toSet, |
| RequestLocal(bufferSupplier) |
| )).thenReturn(Map( |
| "group-1" -> Errors.NONE, |
| "group-2" -> Errors.NOT_COORDINATOR, |
| "group-3" -> Errors.INVALID_GROUP_ID, |
| )) |
| |
| val future = adapter.deleteGroups(ctx, groupIds.asJava, bufferSupplier) |
| assertTrue(future.isDone) |
| |
| val expectedResults = new DeleteGroupsResponseData.DeletableGroupResultCollection() |
| expectedResults.add(new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-1") |
| .setErrorCode(Errors.NONE.code)) |
| expectedResults.add(new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-2") |
| .setErrorCode(Errors.NOT_COORDINATOR.code)) |
| expectedResults.add(new DeleteGroupsResponseData.DeletableGroupResult() |
| .setGroupId("group-3") |
| .setErrorCode(Errors.INVALID_GROUP_ID.code)) |
| |
| assertEquals(expectedResults, future.get()) |
| } |
| |
| @Test |
| def testFetchAllOffsets(): Unit = { |
| val foo0 = new TopicPartition("foo", 0) |
| val foo1 = new TopicPartition("foo", 1) |
| val bar1 = new TopicPartition("bar", 1) |
| |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| when(groupCoordinator.handleFetchOffsets( |
| "group", |
| true, |
| None |
| )).thenReturn(( |
| Errors.NONE, |
| Map( |
| foo0 -> new OffsetFetchResponse.PartitionData( |
| 100, |
| Optional.of(1), |
| "foo", |
| Errors.NONE |
| ), |
| bar1 -> new OffsetFetchResponse.PartitionData( |
| -1, |
| Optional.empty[Integer], |
| "", |
| Errors.UNKNOWN_TOPIC_OR_PARTITION |
| ), |
| foo1 -> new OffsetFetchResponse.PartitionData( |
| 200, |
| Optional.empty[Integer], |
| "", |
| Errors.NONE |
| ), |
| ) |
| )) |
| |
| val ctx = makeContext(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.latestVersion) |
| val future = adapter.fetchAllOffsets( |
| ctx, |
| "group", |
| true |
| ) |
| |
| assertTrue(future.isDone) |
| |
| val expectedResponse = List( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName(foo0.topic) |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(foo0.partition) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1) |
| .setMetadata("foo") |
| .setErrorCode(Errors.NONE.code), |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(foo1.partition) |
| .setCommittedOffset(200) |
| .setCommittedLeaderEpoch(-1) |
| .setMetadata("") |
| .setErrorCode(Errors.NONE.code), |
| ).asJava), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName(bar1.topic) |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(bar1.partition) |
| .setCommittedOffset(-1) |
| .setCommittedLeaderEpoch(-1) |
| .setMetadata("") |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) |
| ).asJava) |
| ) |
| |
| assertEquals( |
| expectedResponse.sortWith(_.name > _.name), |
| future.get().asScala.toList.sortWith(_.name > _.name) |
| ) |
| } |
| |
| @Test |
| def testFetchOffsets(): Unit = { |
| val foo0 = new TopicPartition("foo", 0) |
| val foo1 = new TopicPartition("foo", 1) |
| val bar1 = new TopicPartition("bar", 1) |
| |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| when(groupCoordinator.handleFetchOffsets( |
| "group", |
| true, |
| Some(Seq(foo0, foo1, bar1)) |
| )).thenReturn(( |
| Errors.NONE, |
| Map( |
| foo0 -> new OffsetFetchResponse.PartitionData( |
| 100, |
| Optional.of(1), |
| "foo", |
| Errors.NONE |
| ), |
| bar1 -> new OffsetFetchResponse.PartitionData( |
| -1, |
| Optional.empty[Integer], |
| "", |
| Errors.UNKNOWN_TOPIC_OR_PARTITION |
| ), |
| foo1 -> new OffsetFetchResponse.PartitionData( |
| 200, |
| Optional.empty[Integer], |
| "", |
| Errors.NONE |
| ), |
| ) |
| )) |
| |
| val ctx = makeContext(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.latestVersion) |
| val future = adapter.fetchOffsets( |
| ctx, |
| "group", |
| List( |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName(foo0.topic) |
| .setPartitionIndexes(List[Integer](foo0.partition, foo1.partition).asJava), |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName(bar1.topic) |
| .setPartitionIndexes(List[Integer](bar1.partition).asJava), |
| ).asJava, |
| true |
| ) |
| |
| assertTrue(future.isDone) |
| |
| val expectedResponse = List( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName(foo0.topic) |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(foo0.partition) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1) |
| .setMetadata("foo") |
| .setErrorCode(Errors.NONE.code), |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(foo1.partition) |
| .setCommittedOffset(200) |
| .setCommittedLeaderEpoch(-1) |
| .setMetadata("") |
| .setErrorCode(Errors.NONE.code), |
| ).asJava), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName(bar1.topic) |
| .setPartitions(List( |
| new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(bar1.partition) |
| .setCommittedOffset(-1) |
| .setCommittedLeaderEpoch(-1) |
| .setMetadata("") |
| .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) |
| ).asJava) |
| ) |
| |
| assertEquals( |
| expectedResponse.sortWith(_.name > _.name), |
| future.get().asScala.toList.sortWith(_.name > _.name) |
| ) |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) |
| def testCommitOffsets(version: Short): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val time = new MockTime() |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, time) |
| val now = time.milliseconds() |
| |
| val ctx = makeContext(ApiKeys.OFFSET_COMMIT, version) |
| val data = new OffsetCommitRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setGenerationId(10) |
| .setRetentionTimeMs(1000) |
| .setTopics(List( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommitTimestamp(now) |
| .setCommittedLeaderEpoch(1) |
| ).asJava) |
| ).asJava) |
| val bufferSupplier = BufferSupplier.create() |
| |
| val future = adapter.commitOffsets(ctx, data, bufferSupplier) |
| assertFalse(future.isDone) |
| |
| val capturedCallback: ArgumentCaptor[Map[TopicIdPartition, Errors] => Unit] = |
| ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, Errors] => Unit]) |
| |
| verify(groupCoordinator).handleCommitOffsets( |
| ArgumentMatchers.eq(data.groupId), |
| ArgumentMatchers.eq(data.memberId), |
| ArgumentMatchers.eq(None), |
| ArgumentMatchers.eq(data.generationId), |
| ArgumentMatchers.eq(Map( |
| new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new OffsetAndMetadata( |
| offset = 100, |
| leaderEpoch = Optional.of[Integer](1), |
| metadata = "", |
| commitTimestamp = now, |
| expireTimestamp = Some(now + 1000L) |
| ) |
| )), |
| capturedCallback.capture(), |
| ArgumentMatchers.eq(RequestLocal(bufferSupplier)) |
| ) |
| |
| capturedCallback.getValue.apply(Map( |
| new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> Errors.NONE |
| )) |
| |
| val expectedResponseData = new OffsetCommitResponseData() |
| .setTopics(List( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code) |
| ).asJava) |
| ).asJava) |
| |
| assertTrue(future.isDone) |
| assertEquals(expectedResponseData, future.get()) |
| } |
| |
| @Test |
| def testCommitTransactionalOffsets(): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val time = new MockTime() |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, time) |
| val now = time.milliseconds() |
| |
| val ctx = makeContext(ApiKeys.TXN_OFFSET_COMMIT, ApiKeys.TXN_OFFSET_COMMIT.latestVersion) |
| val data = new TxnOffsetCommitRequestData() |
| .setGroupId("group") |
| .setMemberId("member") |
| .setGenerationId(10) |
| .setProducerEpoch(1) |
| .setProducerId(2) |
| .setTransactionalId("transaction-id") |
| .setTopics(List( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100) |
| .setCommittedLeaderEpoch(1) |
| ).asJava) |
| ).asJava) |
| val bufferSupplier = BufferSupplier.create() |
| |
| val future = adapter.commitTransactionalOffsets(ctx, data, bufferSupplier) |
| assertFalse(future.isDone) |
| |
| val capturedCallback: ArgumentCaptor[Map[TopicIdPartition, Errors] => Unit] = |
| ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, Errors] => Unit]) |
| |
| verify(groupCoordinator).handleTxnCommitOffsets( |
| ArgumentMatchers.eq(data.groupId), |
| ArgumentMatchers.eq(data.producerId), |
| ArgumentMatchers.eq(data.producerEpoch), |
| ArgumentMatchers.eq(data.memberId), |
| ArgumentMatchers.eq(None), |
| ArgumentMatchers.eq(data.generationId), |
| ArgumentMatchers.eq(Map( |
| new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new OffsetAndMetadata( |
| offset = 100, |
| leaderEpoch = Optional.of[Integer](1), |
| metadata = "", |
| commitTimestamp = now, |
| expireTimestamp = None |
| ) |
| )), |
| capturedCallback.capture(), |
| ArgumentMatchers.eq(RequestLocal(bufferSupplier)) |
| ) |
| |
| capturedCallback.getValue.apply(Map( |
| new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> Errors.NONE |
| )) |
| |
| val expectedData = new TxnOffsetCommitResponseData() |
| .setTopics(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code) |
| ).asJava) |
| ).asJava) |
| |
| assertTrue(future.isDone) |
| assertEquals(expectedData, future.get()) |
| } |
| |
| def testDeleteOffsets(): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| val foo0 = new TopicPartition("foo", 0) |
| val foo1 = new TopicPartition("foo", 1) |
| val bar0 = new TopicPartition("bar", 0) |
| val bar1 = new TopicPartition("bar", 1) |
| |
| val ctx = makeContext(ApiKeys.OFFSET_DELETE, ApiKeys.OFFSET_DELETE.latestVersion) |
| val data = new OffsetDeleteRequestData() |
| .setGroupId("group") |
| .setTopics(new OffsetDeleteRequestTopicCollection(List( |
| new OffsetDeleteRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetDeleteRequestPartition().setPartitionIndex(0), |
| new OffsetDeleteRequestPartition().setPartitionIndex(1) |
| ).asJava), |
| new OffsetDeleteRequestTopic() |
| .setName("bar") |
| .setPartitions(List( |
| new OffsetDeleteRequestPartition().setPartitionIndex(0), |
| new OffsetDeleteRequestPartition().setPartitionIndex(1) |
| ).asJava) |
| ).asJava.iterator)) |
| val bufferSupplier = BufferSupplier.create() |
| |
| when(groupCoordinator.handleDeleteOffsets( |
| data.groupId, |
| Seq(foo0, foo1, bar0, bar1), |
| RequestLocal(bufferSupplier) |
| )).thenReturn(( |
| Errors.NONE, |
| Map( |
| foo0 -> Errors.NONE, |
| foo1 -> Errors.NONE, |
| bar0 -> Errors.GROUP_SUBSCRIBED_TO_TOPIC, |
| bar1 -> Errors.GROUP_SUBSCRIBED_TO_TOPIC, |
| ) |
| )) |
| |
| val future = adapter.deleteOffsets(ctx, data, bufferSupplier) |
| |
| val expectedData = new OffsetDeleteResponseData() |
| .setTopics(new OffsetDeleteResponseTopicCollection(List( |
| new OffsetDeleteResponseTopic() |
| .setName("foo") |
| .setPartitions(new OffsetDeleteResponsePartitionCollection(List( |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code), |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code) |
| ).asJava.iterator)), |
| new OffsetDeleteResponseTopic() |
| .setName("bar") |
| .setPartitions(new OffsetDeleteResponsePartitionCollection(List( |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code), |
| new OffsetDeleteResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code) |
| ).asJava.iterator)), |
| ).asJava.iterator)) |
| |
| assertTrue(future.isDone) |
| assertEquals(expectedData, future.get()) |
| } |
| |
| @Test |
| def testDeleteOffsetsWithGroupLevelError(): Unit = { |
| val groupCoordinator = mock(classOf[GroupCoordinator]) |
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
| |
| val foo0 = new TopicPartition("foo", 0) |
| val foo1 = new TopicPartition("foo", 1) |
| |
| val ctx = makeContext(ApiKeys.OFFSET_DELETE, ApiKeys.OFFSET_DELETE.latestVersion) |
| val data = new OffsetDeleteRequestData() |
| .setGroupId("group") |
| .setTopics(new OffsetDeleteRequestTopicCollection(List( |
| new OffsetDeleteRequestTopic() |
| .setName("foo") |
| .setPartitions(List( |
| new OffsetDeleteRequestPartition().setPartitionIndex(0), |
| new OffsetDeleteRequestPartition().setPartitionIndex(1) |
| ).asJava) |
| ).asJava.iterator)) |
| val bufferSupplier = BufferSupplier.create() |
| |
| when(groupCoordinator.handleDeleteOffsets( |
| data.groupId, |
| Seq(foo0, foo1), |
| RequestLocal(bufferSupplier) |
| )).thenReturn((Errors.INVALID_GROUP_ID, Map.empty[TopicPartition, Errors])) |
| |
| val future = adapter.deleteOffsets(ctx, data, bufferSupplier) |
| assertTrue(future.isDone) |
| assertTrue(future.isCompletedExceptionally) |
| assertFutureThrows(future, classOf[InvalidGroupIdException]) |
| } |
| } |