| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.coordinator.group |
| |
| import java.util.{Optional, OptionalInt} |
| import kafka.common.OffsetAndMetadata |
| import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal} |
| import kafka.utils._ |
| import kafka.utils.timer.MockTimer |
| import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} |
| import org.apache.kafka.common.protocol.Errors |
| import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} |
| import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse |
| import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult} |
| |
| import java.util.concurrent.TimeUnit |
| import java.util.concurrent.locks.ReentrantLock |
| import kafka.cluster.Partition |
| import kafka.zk.KafkaZkClient |
| import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription |
| import org.apache.kafka.clients.consumer.internals.ConsumerProtocol |
| import org.apache.kafka.common.internals.Topic |
| import org.apache.kafka.common.metrics.Metrics |
| import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity |
| import org.apache.kafka.server.util.{KafkaScheduler, MockTime} |
| import org.apache.kafka.storage.internals.log.AppendOrigin |
| import org.junit.jupiter.api.Assertions._ |
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} |
| import org.junit.jupiter.params.ParameterizedTest |
| import org.junit.jupiter.params.provider.ValueSource |
| import org.mockito.{ArgumentCaptor, ArgumentMatchers} |
| import org.mockito.ArgumentMatchers.{any, anyLong, anyShort} |
| import org.mockito.Mockito.{mock, when} |
| |
| import scala.jdk.CollectionConverters._ |
| import scala.collection.{Seq, mutable} |
| import scala.collection.mutable.ArrayBuffer |
| import scala.concurrent.duration.Duration |
| import scala.concurrent.{Await, Future, Promise, TimeoutException} |
| |
| class GroupCoordinatorTest { |
| import GroupCoordinatorTest._ |
| |
| type JoinGroupCallback = JoinGroupResult => Unit |
| type SyncGroupCallback = SyncGroupResult => Unit |
| type HeartbeatCallbackParams = Errors |
| type HeartbeatCallback = Errors => Unit |
| type CommitOffsetCallbackParams = Map[TopicIdPartition, Errors] |
| type CommitOffsetCallback = Map[TopicIdPartition, Errors] => Unit |
| type LeaveGroupCallback = LeaveGroupResult => Unit |
| |
| val ClientId = "consumer-test" |
| val ClientHost = "localhost" |
| val GroupMinSessionTimeout = 10 |
| val GroupMaxSessionTimeout = 10 * 60 * 1000 |
| val GroupMaxSize = 4 |
| val DefaultRebalanceTimeout = 500 |
| val DefaultSessionTimeout = 500 |
| val GroupInitialRebalanceDelay = 50 |
| var timer: MockTimer = _ |
| var groupCoordinator: GroupCoordinator = _ |
| var replicaManager: ReplicaManager = _ |
| var scheduler: KafkaScheduler = _ |
| var zkClient: KafkaZkClient = _ |
| |
| private val groupId = "groupId" |
| private val protocolType = "consumer" |
| private val protocolName = "range" |
| private val memberId = "memberId" |
| private val groupInstanceId = "groupInstanceId" |
| private val leaderInstanceId = "leader" |
| private val followerInstanceId = "follower" |
| private val invalidMemberId = "invalidMember" |
| private val metadata = Array[Byte]() |
| private val protocols = List((protocolName, metadata)) |
| private val protocolSuperset = List((protocolName, metadata), ("roundrobin", metadata)) |
| private val requireStable = true |
| private var groupPartitionId: Int = -1 |
| |
| // we use this string value since its hashcode % #.partitions is different |
| private val otherGroupId = "otherGroup" |
| |
| @BeforeEach |
| def setUp(): Unit = { |
| val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") |
| props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, GroupMinSessionTimeout.toString) |
| props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, GroupMaxSessionTimeout.toString) |
| props.setProperty(KafkaConfig.GroupMaxSizeProp, GroupMaxSize.toString) |
| props.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, GroupInitialRebalanceDelay.toString) |
| // make two partitions of the group topic to make sure some partitions are not owned by the coordinator |
| val ret = mutable.Map[String, Map[Int, Seq[Int]]]() |
| ret += (Topic.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1))) |
| |
| replicaManager = mock(classOf[ReplicaManager]) |
| |
| zkClient = mock(classOf[KafkaZkClient]) |
| // make two partitions of the group topic to make sure some partitions are not owned by the coordinator |
| when(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).thenReturn(Some(2)) |
| |
| timer = new MockTimer |
| |
| val config = KafkaConfig.fromProps(props) |
| |
| val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false) |
| val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer, config.brokerId, reaperEnabled = false) |
| |
| groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, new Metrics()) |
| groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions), |
| enableMetadataExpiration = false) |
| |
| // add the partition into the owned partition list |
| groupPartitionId = groupCoordinator.partitionFor(groupId) |
| groupCoordinator.groupManager.addOwnedPartition(groupPartitionId) |
| } |
| |
| @AfterEach |
| def tearDown(): Unit = { |
| if (groupCoordinator != null) |
| groupCoordinator.shutdown() |
| } |
| |
| @Test |
| def testRequestHandlingWhileLoadingInProgress(): Unit = { |
| val otherGroupPartitionId = groupCoordinator.groupManager.partitionFor(otherGroupId) |
| assertTrue(otherGroupPartitionId != groupPartitionId) |
| |
| groupCoordinator.groupManager.addLoadingPartition(otherGroupPartitionId) |
| assertTrue(groupCoordinator.groupManager.isGroupLoading(otherGroupId)) |
| |
| // Dynamic Member JoinGroup |
| var joinGroupResponse: Option[JoinGroupResult] = None |
| groupCoordinator.handleJoinGroup(otherGroupId, memberId, None, true, true, "clientId", "clientHost", 60000, 10000, "consumer", |
| List("range" -> new Array[Byte](0)), result => { joinGroupResponse = Some(result)}) |
| assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), joinGroupResponse.map(_.error)) |
| |
| // Static Member JoinGroup |
| groupCoordinator.handleJoinGroup(otherGroupId, memberId, Some("groupInstanceId"), false, true, "clientId", "clientHost", 60000, 10000, "consumer", |
| List("range" -> new Array[Byte](0)), result => { joinGroupResponse = Some(result)}) |
| assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), joinGroupResponse.map(_.error)) |
| |
| // SyncGroup |
| var syncGroupResponse: Option[Errors] = None |
| groupCoordinator.handleSyncGroup(otherGroupId, 1, memberId, Some("consumer"), Some("range"), None, Map.empty[String, Array[Byte]], |
| syncGroupResult => syncGroupResponse = Some(syncGroupResult.error)) |
| assertEquals(Some(Errors.REBALANCE_IN_PROGRESS), syncGroupResponse) |
| |
| // OffsetCommit |
| val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0 , "foo") |
| var offsetCommitErrors = Map.empty[TopicIdPartition, Errors] |
| groupCoordinator.handleCommitOffsets(otherGroupId, memberId, None, 1, |
| Map(topicIdPartition -> offsetAndMetadata(15L)), result => { offsetCommitErrors = result }) |
| assertEquals(Map(topicIdPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS), offsetCommitErrors) |
| |
| // Heartbeat |
| var heartbeatError: Option[Errors] = None |
| groupCoordinator.handleHeartbeat(otherGroupId, memberId, None, 1, error => { heartbeatError = Some(error) }) |
| assertEquals(Some(Errors.NONE), heartbeatError) |
| |
| // DescribeGroups |
| val (describeGroupError, _) = groupCoordinator.handleDescribeGroup(otherGroupId) |
| assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupError) |
| |
| // ListGroups |
| val (listGroupsError, _) = groupCoordinator.handleListGroups(Set()) |
| assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsError) |
| |
| // DeleteGroups |
| val deleteGroupsErrors = groupCoordinator.handleDeleteGroups(Set(otherGroupId)) |
| assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), deleteGroupsErrors.get(otherGroupId)) |
| |
| // Check that non-loading groups are still accessible |
| assertEquals(Errors.NONE, groupCoordinator.handleDescribeGroup(groupId)._1) |
| |
| // After loading, we should be able to access the group |
| val otherGroupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, otherGroupPartitionId) |
| when(replicaManager.getLog(otherGroupMetadataTopicPartition)).thenReturn(None) |
| // Call removeGroupsAndOffsets so that partition removed from loadingPartitions |
| groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition, OptionalInt.of(1), group => {}) |
| groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition, 1, group => {}, 0L) |
| assertEquals(Errors.NONE, groupCoordinator.handleDescribeGroup(otherGroupId)._1) |
| } |
| |
| @Test |
| def testOffsetsRetentionMsIntegerOverflow(): Unit = { |
| val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") |
| props.setProperty(KafkaConfig.OffsetsRetentionMinutesProp, Integer.MAX_VALUE.toString) |
| val config = KafkaConfig.fromProps(props) |
| val offsetConfig = GroupCoordinator.offsetConfig(config) |
| assertEquals(offsetConfig.offsetsRetentionMs, Integer.MAX_VALUE * 60L * 1000L) |
| } |
| |
| @Test |
| def testJoinGroupWrongCoordinator(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| var joinGroupResult = dynamicJoinGroup(otherGroupId, memberId, protocolType, protocols) |
| assertEquals(Errors.NOT_COORDINATOR, joinGroupResult.error) |
| |
| joinGroupResult = staticJoinGroup(otherGroupId, memberId, groupInstanceId, protocolType, protocols) |
| assertEquals(Errors.NOT_COORDINATOR, joinGroupResult.error) |
| } |
| |
| @Test |
| def testJoinGroupShouldReceiveErrorIfGroupOverMaxSize(): Unit = { |
| val futures = ArrayBuffer[Future[JoinGroupResult]]() |
| val rebalanceTimeout = GroupInitialRebalanceDelay * 2 |
| |
| for (i <- 1.to(GroupMaxSize)) { |
| futures += sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout) |
| if (i != 1) |
| timer.advanceClock(GroupInitialRebalanceDelay) |
| } |
| // advance clock beyond rebalanceTimeout |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| for (future <- futures) { |
| assertEquals(Errors.NONE, await(future, 1).error) |
| } |
| |
| // Should receive an error since the group is full |
| val errorFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout) |
| assertEquals(Errors.GROUP_MAX_SIZE_REACHED, await(errorFuture, 1).error) |
| } |
| |
| @Test |
| def testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember(): Unit = { |
| val requiredKnownMemberId = true |
| val nbMembers = GroupMaxSize + 1 |
| |
| // First JoinRequests |
| var futures = 1.to(nbMembers).map { _ => |
| sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, |
| None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) |
| } |
| |
| // Get back the assigned member ids |
| val memberIds = futures.map(await(_, 1).memberId) |
| |
| // Second JoinRequests |
| futures = memberIds.map { memberId => |
| sendJoinGroup(groupId, memberId, protocolType, protocols, |
| None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) |
| } |
| |
| // advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| // advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| |
| // Awaiting results |
| val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error) |
| |
| assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE)) |
| assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED)) |
| |
| // Members which were accepted can rejoin, others are rejected, while |
| // completing rebalance |
| futures = memberIds.map { memberId => |
| sendJoinGroup(groupId, memberId, protocolType, protocols, |
| None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) |
| } |
| |
| // Awaiting results |
| val rejoinErrors = futures.map(await(_, 1).error) |
| |
| assertEquals(errors, rejoinErrors) |
| } |
| |
| @Test |
| def testDynamicMembersJoinGroupWithMaxSize(): Unit = { |
| val requiredKnownMemberId = false |
| val nbMembers = GroupMaxSize + 1 |
| |
| // JoinRequests |
| var futures = 1.to(nbMembers).map { _ => |
| sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, |
| None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) |
| } |
| |
| // advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| // advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| |
| // Awaiting results |
| val joinGroupResults = futures.map(await(_, DefaultRebalanceTimeout + 1)) |
| val errors = joinGroupResults.map(_.error) |
| |
| assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE)) |
| assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED)) |
| |
| // Members which were accepted can rejoin, others are rejected, while |
| // completing rebalance |
| val memberIds = joinGroupResults.map(_.memberId) |
| futures = memberIds.map { memberId => |
| sendJoinGroup(groupId, memberId, protocolType, protocols, |
| None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) |
| } |
| |
| // Awaiting results |
| val rejoinErrors = futures.map(await(_, 1).error) |
| |
| assertEquals(errors, rejoinErrors) |
| } |
| |
| @Test |
| def testStaticMembersJoinGroupWithMaxSize(): Unit = { |
| val nbMembers = GroupMaxSize + 1 |
| val instanceIds = 1.to(nbMembers).map(i => Some(s"instance-id-$i")) |
| |
| // JoinRequests |
| var futures = instanceIds.map { instanceId => |
| sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, |
| instanceId, DefaultSessionTimeout, DefaultRebalanceTimeout) |
| } |
| |
| // advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| // advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| |
| // Awaiting results |
| val joinGroupResults = futures.map(await(_, DefaultRebalanceTimeout + 1)) |
| val errors = joinGroupResults.map(_.error) |
| |
| assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE)) |
| assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED)) |
| |
| // Members which were accepted can rejoin, others are rejected, while |
| // completing rebalance |
| val memberIds = joinGroupResults.map(_.memberId) |
| futures = instanceIds.zip(memberIds).map { case (instanceId, memberId) => |
| sendJoinGroup(groupId, memberId, protocolType, protocols, |
| instanceId, DefaultSessionTimeout, DefaultRebalanceTimeout) |
| } |
| |
| // Awaiting results |
| val rejoinErrors = futures.map(await(_, 1).error) |
| |
| assertEquals(errors, rejoinErrors) |
| } |
| |
| @Test |
| def testDynamicMembersCanReJoinGroupWithMaxSizeWhileRebalancing(): Unit = { |
| val requiredKnownMemberId = true |
| val nbMembers = GroupMaxSize + 1 |
| |
| // First JoinRequests |
| var futures = 1.to(nbMembers).map { _ => |
| sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, |
| None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) |
| } |
| |
| // Get back the assigned member ids |
| val memberIds = futures.map(await(_, 1).memberId) |
| |
| // Second JoinRequests |
| memberIds.map { memberId => |
| sendJoinGroup(groupId, memberId, protocolType, protocols, |
| None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) |
| } |
| |
| // Members can rejoin while rebalancing |
| futures = memberIds.map { memberId => |
| sendJoinGroup(groupId, memberId, protocolType, protocols, |
| None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) |
| } |
| |
| // advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| // advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| |
| // Awaiting results |
| val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error) |
| |
| assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE)) |
| assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED)) |
| } |
| |
| @Test |
| def testLastJoiningMembersAreKickedOutWhenReJoiningGroupWithMaxSize(): Unit = { |
| val nbMembers = GroupMaxSize + 2 |
| val group = new GroupMetadata(groupId, Stable, new MockTime()) |
| val memberIds = 1.to(nbMembers).map(_ => group.generateMemberId(ClientId, None)) |
| |
| memberIds.foreach { memberId => |
| group.add(new MemberMetadata(memberId, None, ClientId, ClientHost, |
| DefaultRebalanceTimeout, GroupMaxSessionTimeout, protocolType, protocols)) |
| } |
| groupCoordinator.groupManager.addGroup(group) |
| |
| groupCoordinator.prepareRebalance(group, "") |
| |
| val futures = memberIds.map { memberId => |
| sendJoinGroup(groupId, memberId, protocolType, protocols, |
| None, GroupMaxSessionTimeout, DefaultRebalanceTimeout) |
| } |
| |
| // advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin |
| timer.advanceClock(DefaultRebalanceTimeout + 1) |
| |
| // Awaiting results |
| val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error) |
| |
| assertEquals(Set(Errors.NONE), errors.take(GroupMaxSize).toSet) |
| assertEquals(Set(Errors.GROUP_MAX_SIZE_REACHED), errors.drop(GroupMaxSize).toSet) |
| |
| memberIds.drop(GroupMaxSize).foreach { memberId => |
| assertFalse(group.has(memberId)) |
| } |
| } |
| |
| @Test |
| def testJoinGroupSessionTimeoutTooSmall(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMinSessionTimeout - 1) |
| assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupResult.error) |
| } |
| |
| @Test |
| def testJoinGroupSessionTimeoutTooLarge(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMaxSessionTimeout + 1) |
| assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupResult.error) |
| } |
| |
| @Test |
| def testJoinGroupUnknownConsumerNewGroup(): Unit = { |
| var joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error) |
| |
| joinGroupResult = staticJoinGroup(groupId, memberId, groupInstanceId, protocolType, protocols) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error) |
| } |
| |
| @Test |
| def testInvalidGroupId(): Unit = { |
| val groupId = "" |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| assertEquals(Errors.INVALID_GROUP_ID, joinGroupResult.error) |
| } |
| |
| @Test |
| def testValidJoinGroup(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| } |
| |
| @Test |
| def testJoinGroupInconsistentProtocolType(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val otherJoinGroupResult = await(sendJoinGroup(groupId, otherMemberId, "connect", protocols), 1) |
| assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error) |
| } |
| |
| @Test |
| def testJoinGroupWithEmptyProtocolType(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| var joinGroupResult = dynamicJoinGroup(groupId, memberId, "", protocols) |
| assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error) |
| |
| joinGroupResult = staticJoinGroup(groupId, memberId, groupInstanceId, "", protocols) |
| assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error) |
| } |
| |
| @Test |
| def testJoinGroupWithEmptyGroupProtocol(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, List()) |
| assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error) |
| } |
| |
| @Test |
| def testNewMemberTimeoutCompletion(): Unit = { |
| val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000 |
| val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, None, sessionTimeout, DefaultRebalanceTimeout, false) |
| |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| |
| val joinResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 100, TimeUnit.MILLISECONDS)) |
| val group = groupCoordinator.groupManager.getGroup(groupId).get |
| val memberId = joinResult.memberId |
| |
| assertEquals(Errors.NONE, joinResult.error) |
| assertEquals(0, group.allMemberMetadata.count(_.isNew)) |
| |
| val syncGroupResult = syncGroupLeader(groupId, joinResult.generationId, memberId, Map(memberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| assertEquals(1, group.size) |
| |
| timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 100) |
| |
| // Make sure the NewMemberTimeout is not still in effect, and the member is not kicked |
| assertEquals(1, group.size) |
| |
| timer.advanceClock(sessionTimeout + 100) |
| assertEquals(0, group.size) |
| } |
| |
| @Test |
| def testNewMemberJoinExpiration(): Unit = { |
| // This tests new member expiration during a protracted rebalance. We first create a |
| // group with one member which uses a large value for session timeout and rebalance timeout. |
| // We then join with one new member and let the rebalance hang while we await the first member. |
| // The new member join timeout expires and its JoinGroup request is failed. |
| |
| val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000 |
| val rebalanceTimeout = GroupCoordinator.NewMemberJoinTimeoutMs * 2 |
| |
| val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, |
| sessionTimeout, rebalanceTimeout) |
| val firstMemberId = firstJoinResult.memberId |
| assertEquals(firstMemberId, firstJoinResult.leaderId) |
| assertEquals(Errors.NONE, firstJoinResult.error) |
| |
| val groupOpt = groupCoordinator.groupManager.getGroup(groupId) |
| assertTrue(groupOpt.isDefined) |
| val group = groupOpt.get |
| assertEquals(0, group.allMemberMetadata.count(_.isNew)) |
| |
| val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, None, sessionTimeout, rebalanceTimeout) |
| assertFalse(responseFuture.isCompleted) |
| |
| assertEquals(2, group.allMembers.size) |
| assertEquals(1, group.allMemberMetadata.count(_.isNew)) |
| |
| val newMember = group.allMemberMetadata.find(_.isNew).get |
| assertNotEquals(firstMemberId, newMember.memberId) |
| |
| timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 1) |
| assertTrue(responseFuture.isCompleted) |
| |
| val response = Await.result(responseFuture, Duration(0, TimeUnit.MILLISECONDS)) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, response.error) |
| assertEquals(1, group.allMembers.size) |
| assertEquals(0, group.allMemberMetadata.count(_.isNew)) |
| assertEquals(firstMemberId, group.allMembers.head) |
| } |
| |
| @Test |
| def testNewMemberFailureAfterJoinGroupCompletion(): Unit = { |
| // For old versions of the JoinGroup protocol, new members were subject |
| // to expiration if the rebalance took long enough. This test case ensures |
| // that following completion of the JoinGroup phase, new members follow |
| // normal heartbeat expiration logic. |
| |
| val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val firstMemberId = firstJoinResult.memberId |
| val firstGenerationId = firstJoinResult.generationId |
| assertEquals(firstMemberId, firstJoinResult.leaderId) |
| assertEquals(Errors.NONE, firstJoinResult.error) |
| |
| val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, |
| Map(firstMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, firstSyncResult.error) |
| |
| val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| |
| val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols, |
| requireKnownMemberId = false) |
| |
| val joinResult = await(joinFuture, DefaultSessionTimeout+100) |
| val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) |
| assertEquals(Errors.NONE, joinResult.error) |
| assertEquals(Errors.NONE, otherJoinResult.error) |
| |
| verifySessionExpiration(groupId) |
| } |
| |
| @Test |
| def testNewMemberFailureAfterSyncGroupCompletion(): Unit = { |
| // For old versions of the JoinGroup protocol, new members were subject |
| // to expiration if the rebalance took long enough. This test case ensures |
| // that following completion of the SyncGroup phase, new members follow |
| // normal heartbeat expiration logic. |
| |
| val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val firstMemberId = firstJoinResult.memberId |
| val firstGenerationId = firstJoinResult.generationId |
| assertEquals(firstMemberId, firstJoinResult.leaderId) |
| assertEquals(Errors.NONE, firstJoinResult.error) |
| |
| val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, |
| Map(firstMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, firstSyncResult.error) |
| |
| val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| |
| val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols, |
| requireKnownMemberId = false) |
| |
| val joinResult = await(joinFuture, DefaultSessionTimeout+100) |
| val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) |
| assertEquals(Errors.NONE, joinResult.error) |
| assertEquals(Errors.NONE, otherJoinResult.error) |
| val secondGenerationId = joinResult.generationId |
| val secondMemberId = otherJoinResult.memberId |
| |
| sendSyncGroupFollower(groupId, secondGenerationId, secondMemberId) |
| |
| val syncGroupResult = syncGroupLeader(groupId, secondGenerationId, firstMemberId, |
| Map(firstMemberId -> Array.emptyByteArray, secondMemberId -> Array.emptyByteArray)) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| verifySessionExpiration(groupId) |
| } |
| |
| private def verifySessionExpiration(groupId: String): Unit = { |
| when(replicaManager.getMagic(any[TopicPartition])) |
| .thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) |
| |
| timer.advanceClock(DefaultSessionTimeout + 1) |
| |
| val groupMetadata = group(groupId) |
| assertEquals(Empty, groupMetadata.currentState) |
| assertTrue(groupMetadata.allMembers.isEmpty) |
| } |
| |
| @Test |
| def testJoinGroupInconsistentGroupProtocol(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, List(("range", metadata))) |
| |
| val otherJoinGroupResult = dynamicJoinGroup(groupId, otherMemberId, protocolType, List(("roundrobin", metadata))) |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| |
| val joinGroupResult = await(joinGroupFuture, 1) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error) |
| } |
| |
| @Test |
| def testJoinGroupUnknownConsumerExistingGroup(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val otherMemberId = "memberId" |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val otherJoinGroupResult = await(sendJoinGroup(groupId, otherMemberId, protocolType, protocols), 1) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, otherJoinGroupResult.error) |
| } |
| |
| @Test |
| def testJoinGroupUnknownConsumerNewDeadGroup(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val deadGroupId = "deadGroupId" |
| |
| groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime())) |
| val joinGroupResult = dynamicJoinGroup(deadGroupId, memberId, protocolType, protocols) |
| assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, joinGroupResult.error) |
| } |
| |
| @Test |
| def testSyncDeadGroup(): Unit = { |
| val memberId = "memberId" |
| val deadGroupId = "deadGroupId" |
| |
| groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime())) |
| val syncGroupResult = syncGroupFollower(deadGroupId, 1, memberId) |
| assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, syncGroupResult.error) |
| } |
| |
| @Test |
| def testJoinGroupSecondJoinInconsistentProtocol(): Unit = { |
| var responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, requireKnownMemberId = true) |
| var joinGroupResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 1, TimeUnit.MILLISECONDS)) |
| assertEquals(Errors.MEMBER_ID_REQUIRED, joinGroupResult.error) |
| val memberId = joinGroupResult.memberId |
| |
| // Sending an inconsistent protocol shall be refused |
| responseFuture = sendJoinGroup(groupId, memberId, protocolType, List(), requireKnownMemberId = true) |
| joinGroupResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 1, TimeUnit.MILLISECONDS)) |
| assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error) |
| |
| // Sending consistent protocol shall be accepted |
| responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, requireKnownMemberId = true) |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| joinGroupResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 1, TimeUnit.MILLISECONDS)) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| } |
| |
| @Test |
| def staticMemberJoinAsFirstMember(): Unit = { |
| val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, groupInstanceId, protocolType, protocols) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| } |
| |
| @Test |
| def staticMemberReJoinWithExplicitUnknownMemberId(): Unit = { |
| var joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, groupInstanceId, protocolType, protocols) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val unknownMemberId = "unknown_member" |
| joinGroupResult = staticJoinGroup(groupId, unknownMemberId, groupInstanceId, protocolType, protocols) |
| assertEquals(Errors.FENCED_INSTANCE_ID, joinGroupResult.error) |
| } |
| |
| @Test |
| def staticMemberFenceDuplicateRejoinedFollower(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| // A third member joins will trigger rebalance. |
| sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| timer.advanceClock(1) |
| assertTrue(getGroup(groupId).is(PreparingRebalance)) |
| |
| timer.advanceClock(1) |
| // Old follower rejoins group will be matching current member.id. |
| val oldFollowerJoinGroupFuture = |
| sendJoinGroup(groupId, rebalanceResult.followerId, protocolType, protocols, groupInstanceId = Some(followerInstanceId)) |
| |
| timer.advanceClock(1) |
| // Duplicate follower joins group with unknown member id will trigger member.id replacement. |
| val duplicateFollowerJoinFuture = |
| sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, groupInstanceId = Some(followerInstanceId)) |
| |
| timer.advanceClock(1) |
| // Old member shall be fenced immediately upon duplicate follower joins. |
| val oldFollowerJoinGroupResult = Await.result(oldFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| checkJoinGroupResult(oldFollowerJoinGroupResult, |
| Errors.FENCED_INSTANCE_ID, |
| -1, |
| Set.empty, |
| PreparingRebalance, |
| None) |
| verifyDelayedTaskNotCompleted(duplicateFollowerJoinFuture) |
| } |
| |
| @Test |
| def staticMemberFenceDuplicateSyncingFollowerAfterMemberIdChanged(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| // Known leader rejoins will trigger rebalance. |
| val leaderJoinGroupFuture = |
| sendJoinGroup(groupId, rebalanceResult.leaderId, protocolType, protocols, groupInstanceId = Some(leaderInstanceId)) |
| timer.advanceClock(1) |
| assertTrue(getGroup(groupId).is(PreparingRebalance)) |
| |
| timer.advanceClock(1) |
| // Old follower rejoins group will match current member.id. |
| val oldFollowerJoinGroupFuture = |
| sendJoinGroup(groupId, rebalanceResult.followerId, protocolType, protocols, groupInstanceId = Some(followerInstanceId)) |
| |
| timer.advanceClock(1) |
| val leaderJoinGroupResult = Await.result(leaderJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| checkJoinGroupResult(leaderJoinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation + 1, |
| Set(leaderInstanceId, followerInstanceId), |
| CompletingRebalance, |
| Some(protocolType)) |
| assertEquals(rebalanceResult.leaderId, leaderJoinGroupResult.memberId) |
| assertEquals(rebalanceResult.leaderId, leaderJoinGroupResult.leaderId) |
| |
| // Old follower shall be getting a successful join group response. |
| val oldFollowerJoinGroupResult = Await.result(oldFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| checkJoinGroupResult(oldFollowerJoinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation + 1, |
| Set.empty, |
| CompletingRebalance, |
| Some(protocolType), |
| expectedLeaderId = leaderJoinGroupResult.memberId) |
| assertEquals(rebalanceResult.followerId, oldFollowerJoinGroupResult.memberId) |
| assertEquals(rebalanceResult.leaderId, oldFollowerJoinGroupResult.leaderId) |
| assertTrue(getGroup(groupId).is(CompletingRebalance)) |
| |
| // Duplicate follower joins group with unknown member id will trigger member.id replacement, |
| // and will also trigger a rebalance under CompletingRebalance state; the old follower sync callback |
| // will return fenced exception while broker replaces the member identity with the duplicate follower joins. |
| val oldFollowerSyncGroupFuture = sendSyncGroupFollower(groupId, oldFollowerJoinGroupResult.generationId, |
| oldFollowerJoinGroupResult.memberId, Some(protocolType), Some(protocolName), Some(followerInstanceId)) |
| |
| val duplicateFollowerJoinFuture = |
| sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, groupInstanceId = Some(followerInstanceId)) |
| timer.advanceClock(1) |
| |
| val oldFollowerSyncGroupResult = Await.result(oldFollowerSyncGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| assertEquals(Errors.FENCED_INSTANCE_ID, oldFollowerSyncGroupResult.error) |
| assertTrue(getGroup(groupId).is(PreparingRebalance)) |
| |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| timer.advanceClock(DefaultRebalanceTimeout + 1) |
| |
| val duplicateFollowerJoinGroupResult = Await.result(duplicateFollowerJoinFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| checkJoinGroupResult(duplicateFollowerJoinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation + 2, |
| Set(followerInstanceId), // this follower will become the new leader, and hence it would have the member list |
| CompletingRebalance, |
| Some(protocolType), |
| expectedLeaderId = duplicateFollowerJoinGroupResult.memberId) |
| assertTrue(getGroup(groupId).is(CompletingRebalance)) |
| } |
| |
| @Test |
| def staticMemberFenceDuplicateRejoiningFollowerAfterMemberIdChanged(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| // Known leader rejoins will trigger rebalance. |
| val leaderJoinGroupFuture = |
| sendJoinGroup(groupId, rebalanceResult.leaderId, protocolType, protocols, groupInstanceId = Some(leaderInstanceId)) |
| timer.advanceClock(1) |
| assertTrue(getGroup(groupId).is(PreparingRebalance)) |
| |
| // Duplicate follower joins group will trigger member.id replacement. |
| val duplicateFollowerJoinGroupFuture = |
| sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, groupInstanceId = Some(followerInstanceId)) |
| |
| timer.advanceClock(1) |
| // Old follower rejoins group will fail because member.id already updated. |
| val oldFollowerJoinGroupFuture = |
| sendJoinGroup(groupId, rebalanceResult.followerId, protocolType, protocols, groupInstanceId = Some(followerInstanceId)) |
| |
| val leaderRejoinGroupResult = Await.result(leaderJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| checkJoinGroupResult(leaderRejoinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation + 1, |
| Set(leaderInstanceId, followerInstanceId), |
| CompletingRebalance, |
| Some(protocolType)) |
| |
| val duplicateFollowerJoinGroupResult = Await.result(duplicateFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| checkJoinGroupResult(duplicateFollowerJoinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation + 1, |
| Set.empty, |
| CompletingRebalance, |
| Some(protocolType)) |
| assertNotEquals(rebalanceResult.followerId, duplicateFollowerJoinGroupResult.memberId) |
| |
| val oldFollowerJoinGroupResult = Await.result(oldFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| checkJoinGroupResult(oldFollowerJoinGroupResult, |
| Errors.FENCED_INSTANCE_ID, |
| -1, |
| Set.empty, |
| CompletingRebalance, |
| None) |
| } |
| |
| @Test |
| def staticMemberRejoinWithKnownMemberId(): Unit = { |
| var joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, groupInstanceId, protocolType, protocols) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val assignedMemberId = joinGroupResult.memberId |
| // The second join group should return immediately since we are using the same metadata during CompletingRebalance. |
| val rejoinResponseFuture = sendJoinGroup(groupId, assignedMemberId, protocolType, protocols, Some(groupInstanceId)) |
| timer.advanceClock(1) |
| joinGroupResult = Await.result(rejoinResponseFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| assertTrue(getGroup(groupId).is(CompletingRebalance)) |
| |
| val syncGroupFuture = sendSyncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, |
| Some(protocolType), Some(protocolName), Some(groupInstanceId), Map(assignedMemberId -> Array[Byte]())) |
| timer.advanceClock(1) |
| val syncGroupResult = Await.result(syncGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| assertTrue(getGroup(groupId).is(Stable)) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def staticMemberRejoinWithLeaderIdAndUnknownMemberId(supportSkippingAssignment: Boolean): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| // A static leader rejoin with unknown id will not trigger rebalance, and no assignment will be returned. |
| val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, |
| leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1, supportSkippingAssignment = supportSkippingAssignment) |
| |
| checkJoinGroupResult(joinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation, // The group should be at the same generation |
| if (supportSkippingAssignment) Set(leaderInstanceId, followerInstanceId) else Set.empty, |
| Stable, |
| Some(protocolType), |
| if (supportSkippingAssignment) joinGroupResult.memberId else rebalanceResult.leaderId, |
| expectedSkipAssignment = supportSkippingAssignment |
| ) |
| |
| val oldLeaderJoinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1) |
| assertEquals(Errors.FENCED_INSTANCE_ID, oldLeaderJoinGroupResult.error) |
| |
| // Old leader will get fenced. |
| val oldLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, |
| Map.empty, None, None, Some(leaderInstanceId)) |
| assertEquals(Errors.FENCED_INSTANCE_ID, oldLeaderSyncGroupResult.error) |
| |
| // Calling sync on old leader.id will fail because that leader.id is no longer valid and replaced. |
| val newLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, newLeaderSyncGroupResult.error) |
| } |
| |
| @Test |
| def staticMemberRejoinWithLeaderIdAndKnownMemberId(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, |
| sessionTimeout = DefaultRebalanceTimeout / 2) |
| |
| // A static leader with known id rejoin will trigger rebalance. |
| val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, |
| protocolType, protocolSuperset, clockAdvance = DefaultRebalanceTimeout + 1) |
| // Timeout follower in the meantime. |
| assertFalse(getGroup(groupId).hasStaticMember(followerInstanceId)) |
| checkJoinGroupResult(joinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation + 1, // The group has promoted to the new generation. |
| Set(leaderInstanceId), |
| CompletingRebalance, |
| Some(protocolType), |
| rebalanceResult.leaderId, |
| rebalanceResult.leaderId) |
| } |
| |
| @Test |
| def staticMemberRejoinWithLeaderIdAndUnexpectedDeadGroup(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| getGroup(groupId).transitionTo(Dead) |
| |
| val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocols, clockAdvance = 1) |
| assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, joinGroupResult.error) |
| } |
| |
| @Test |
| def staticMemberRejoinWithLeaderIdAndUnexpectedEmptyGroup(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| getGroup(groupId).transitionTo(PreparingRebalance) |
| getGroup(groupId).transitionTo(Empty) |
| |
| val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocols, clockAdvance = 1) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error) |
| } |
| |
| @Test |
| def staticMemberRejoinWithFollowerIdAndChangeOfProtocol(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, sessionTimeout = DefaultSessionTimeout * 2) |
| |
| // A static follower rejoin with changed protocol will trigger rebalance. |
| val newProtocols = List(("roundrobin", metadata)) |
| // Old leader hasn't joined in the meantime, triggering a re-election. |
| val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, newProtocols, clockAdvance = DefaultSessionTimeout + 1) |
| |
| assertEquals(rebalanceResult.followerId, joinGroupResult.memberId) |
| assertTrue(getGroup(groupId).hasStaticMember(leaderInstanceId)) |
| assertTrue(getGroup(groupId).isLeader(rebalanceResult.followerId)) |
| checkJoinGroupResult(joinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation + 1, // The group has promoted to the new generation, and leader has changed because old one times out. |
| Set(leaderInstanceId, followerInstanceId), |
| CompletingRebalance, |
| Some(protocolType), |
| rebalanceResult.followerId, |
| rebalanceResult.followerId) |
| } |
| |
| @Test |
| def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWithSelectedProtocolChanged(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| // A static follower rejoin with protocol changed and also cause updated group's selectedProtocol changed |
| // should trigger rebalance. |
| val selectedProtocols = getGroup(groupId).selectProtocol |
| val newProtocols = List(("roundrobin", metadata)) |
| assert(!newProtocols.map(_._1).contains(selectedProtocols)) |
| // Old leader hasn't joined in the meantime, triggering a re-election. |
| val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, newProtocols, clockAdvance = DefaultSessionTimeout + 1) |
| |
| checkJoinGroupResult(joinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation + 1, |
| Set(leaderInstanceId, followerInstanceId), |
| CompletingRebalance, |
| Some(protocolType)) |
| |
| assertTrue(getGroup(groupId).isLeader(joinGroupResult.memberId)) |
| assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId) |
| assertEquals(joinGroupResult.protocolName, Some("roundrobin")) |
| } |
| |
| @Test |
| def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchangedPersistenceFailure(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val selectedProtocol = getGroup(groupId).selectProtocol |
| val newProtocols = List((selectedProtocol, metadata)) |
| // Timeout old leader in the meantime. |
| val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, |
| followerInstanceId, protocolType, newProtocols, clockAdvance = 1, appendRecordError = Errors.MESSAGE_TOO_LARGE) |
| |
| checkJoinGroupResult(joinGroupResult, |
| Errors.UNKNOWN_SERVER_ERROR, |
| rebalanceResult.generation, |
| Set.empty, |
| Stable, |
| Some(protocolType)) |
| |
| // Join with old member id will not fail because the member id is not updated because of persistence failure |
| assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId) |
| val oldFollowerJoinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, newProtocols, clockAdvance = 1) |
| assertEquals(Errors.NONE, oldFollowerJoinGroupResult.error) |
| |
| // Sync with old member id will also not fail because the member id is not updated because of persistence failure |
| val syncGroupWithOldMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, |
| rebalanceResult.followerId, None, None, Some(followerInstanceId)) |
| assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error) |
| } |
| |
| @Test |
| def staticMemberRejoinWithUpdatedSessionAndRebalanceTimeoutsButCannotPersistChange(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout, appendRecordError = Errors.MESSAGE_TOO_LARGE) |
| checkJoinGroupResult(joinGroupResult, |
| Errors.UNKNOWN_SERVER_ERROR, |
| rebalanceResult.generation, |
| Set.empty, |
| Stable, |
| Some(protocolType)) |
| assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined) |
| val group = groupCoordinator.groupManager.getGroup(groupId).get |
| group.allMemberMetadata.foreach { member => |
| assertEquals(member.sessionTimeoutMs, DefaultSessionTimeout) |
| assertEquals(member.rebalanceTimeoutMs, DefaultRebalanceTimeout) |
| } |
| } |
| |
| |
| @Test |
| def staticMemberRejoinWithUpdatedSessionAndRebalanceTimeoutsAndPersistChange(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| val followerJoinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout) |
| checkJoinGroupResult(followerJoinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation, |
| Set.empty, |
| Stable, |
| Some(protocolType)) |
| val leaderJoinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout) |
| checkJoinGroupResult(leaderJoinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation, |
| Set(leaderInstanceId, followerInstanceId), |
| Stable, |
| Some(protocolType), |
| leaderJoinGroupResult.leaderId, |
| leaderJoinGroupResult.memberId, |
| true) |
| assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined) |
| val group = groupCoordinator.groupManager.getGroup(groupId).get |
| group.allMemberMetadata.foreach { member => |
| assertEquals(member.sessionTimeoutMs, 2 * DefaultSessionTimeout) |
| assertEquals(member.rebalanceTimeoutMs, 2 * DefaultRebalanceTimeout) |
| } |
| } |
| @Test |
| def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchanged(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| // A static follower rejoin with protocol changing to leader protocol subset won't trigger rebalance if updated |
| // group's selectProtocol remain unchanged. |
| val selectedProtocol = getGroup(groupId).selectProtocol |
| val newProtocols = List((selectedProtocol, metadata)) |
| // Timeout old leader in the meantime. |
| val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, |
| followerInstanceId, protocolType, newProtocols, clockAdvance = 1) |
| |
| checkJoinGroupResult(joinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation, |
| Set.empty, |
| Stable, |
| Some(protocolType)) |
| |
| // Join with old member id will fail because the member id is updated |
| assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId) |
| val oldFollowerJoinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, newProtocols, clockAdvance = 1) |
| assertEquals(Errors.FENCED_INSTANCE_ID, oldFollowerJoinGroupResult.error) |
| |
| // Sync with old member id will fail because the member id is updated |
| val syncGroupWithOldMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, |
| rebalanceResult.followerId, None, None, Some(followerInstanceId)) |
| assertEquals(Errors.FENCED_INSTANCE_ID, syncGroupWithOldMemberIdResult.error) |
| |
| val syncGroupWithNewMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, |
| joinGroupResult.memberId, None, None, Some(followerInstanceId)) |
| assertEquals(Errors.NONE, syncGroupWithNewMemberIdResult.error) |
| assertEquals(rebalanceResult.followerAssignment, syncGroupWithNewMemberIdResult.memberAssignment) |
| } |
| |
| @Test |
| def staticMemberRejoinWithKnownLeaderIdToTriggerRebalanceAndFollowerWithChangeofProtocol(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| // A static leader rejoin with known member id will trigger rebalance. |
| val leaderRejoinGroupFuture = sendJoinGroup(groupId, rebalanceResult.leaderId, protocolType, |
| protocolSuperset, Some(leaderInstanceId)) |
| // Rebalance complete immediately after follower rejoin. |
| val followerRejoinWithFuture = sendJoinGroup(groupId, rebalanceResult.followerId, protocolType, |
| protocolSuperset, Some(followerInstanceId)) |
| |
| timer.advanceClock(1) |
| |
| // Leader should get the same assignment as last round. |
| checkJoinGroupResult(await(leaderRejoinGroupFuture, 1), |
| Errors.NONE, |
| rebalanceResult.generation + 1, // The group has promoted to the new generation. |
| Set(leaderInstanceId, followerInstanceId), |
| CompletingRebalance, |
| Some(protocolType), |
| rebalanceResult.leaderId, |
| rebalanceResult.leaderId) |
| |
| checkJoinGroupResult(await(followerRejoinWithFuture, 1), |
| Errors.NONE, |
| rebalanceResult.generation + 1, // The group has promoted to the new generation. |
| Set.empty, |
| CompletingRebalance, |
| Some(protocolType), |
| rebalanceResult.leaderId, |
| rebalanceResult.followerId) |
| |
| // The follower protocol changed from protocolSuperset to general protocols. |
| val followerRejoinWithProtocolChangeFuture = sendJoinGroup(groupId, rebalanceResult.followerId, |
| protocolType, protocols, Some(followerInstanceId)) |
| // The group will transit to PreparingRebalance due to protocol change from follower. |
| assertTrue(getGroup(groupId).is(PreparingRebalance)) |
| |
| timer.advanceClock(DefaultRebalanceTimeout + 1) |
| checkJoinGroupResult(await(followerRejoinWithProtocolChangeFuture, 1), |
| Errors.NONE, |
| rebalanceResult.generation + 2, // The group has promoted to the new generation. |
| Set(followerInstanceId), |
| CompletingRebalance, |
| Some(protocolType), |
| rebalanceResult.followerId, |
| rebalanceResult.followerId) |
| } |
| |
| @Test |
| def staticMemberRejoinAsFollowerWithUnknownMemberId(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| // A static follower rejoin with no protocol change will not trigger rebalance. |
| val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1) |
| |
| // Old leader shouldn't be timed out. |
| assertTrue(getGroup(groupId).hasStaticMember(leaderInstanceId)) |
| checkJoinGroupResult(joinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation, // The group has no change. |
| Set.empty, |
| Stable, |
| Some(protocolType)) |
| |
| assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId) |
| |
| val syncGroupResult = syncGroupFollower(groupId, rebalanceResult.generation, joinGroupResult.memberId) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| assertEquals(rebalanceResult.followerAssignment, syncGroupResult.memberAssignment) |
| } |
| |
| @Test |
| def staticMemberRejoinAsFollowerWithKnownMemberIdAndNoProtocolChange(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| // A static follower rejoin with no protocol change will not trigger rebalance. |
| val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1) |
| |
| // Old leader shouldn't be timed out. |
| assertTrue(getGroup(groupId).hasStaticMember(leaderInstanceId)) |
| checkJoinGroupResult(joinGroupResult, |
| Errors.NONE, |
| rebalanceResult.generation, // The group has no change. |
| Set.empty, |
| Stable, |
| Some(protocolType), |
| rebalanceResult.leaderId, |
| rebalanceResult.followerId) |
| } |
| |
| @Test |
| def staticMemberRejoinAsFollowerWithMismatchedMemberId(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1) |
| assertEquals(Errors.FENCED_INSTANCE_ID, joinGroupResult.error) |
| } |
| |
| @Test |
| def staticMemberRejoinAsLeaderWithMismatchedMemberId(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1) |
| assertEquals(Errors.FENCED_INSTANCE_ID, joinGroupResult.error) |
| } |
| |
| @Test |
| def staticMemberSyncAsLeaderWithInvalidMemberId(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, "invalid", |
| Map.empty, None, None, Some(leaderInstanceId)) |
| assertEquals(Errors.FENCED_INSTANCE_ID, syncGroupResult.error) |
| } |
| |
| @Test |
| def staticMemberHeartbeatLeaderWithInvalidMemberId(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val validHeartbeatResult = heartbeat(groupId, rebalanceResult.leaderId, rebalanceResult.generation) |
| assertEquals(Errors.NONE, validHeartbeatResult) |
| |
| val invalidHeartbeatResult = heartbeat(groupId, invalidMemberId, rebalanceResult.generation, Some(leaderInstanceId)) |
| assertEquals(Errors.FENCED_INSTANCE_ID, invalidHeartbeatResult) |
| } |
| |
| @Test |
| def shouldGetDifferentStaticMemberIdAfterEachRejoin(): Unit = { |
| val initialResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val timeAdvance = 1 |
| var lastMemberId = initialResult.leaderId |
| for (_ <- 1 to 5) { |
| val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, |
| leaderInstanceId, protocolType, protocols, clockAdvance = timeAdvance) |
| assertTrue(joinGroupResult.memberId.startsWith(leaderInstanceId)) |
| assertNotEquals(lastMemberId, joinGroupResult.memberId) |
| lastMemberId = joinGroupResult.memberId |
| } |
| } |
| |
| @Test |
| def testOffsetCommitDeadGroup(): Unit = { |
| val memberId = "memberId" |
| |
| val deadGroupId = "deadGroupId" |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| |
| groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime())) |
| val offsetCommitResult = commitOffsets(deadGroupId, memberId, 1, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.COORDINATOR_NOT_AVAILABLE), offsetCommitResult) |
| } |
| |
| @Test |
| def staticMemberCommitOffsetWithInvalidMemberId(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val validOffsetCommitResult = commitOffsets(groupId, rebalanceResult.leaderId, rebalanceResult.generation, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), validOffsetCommitResult) |
| |
| val invalidOffsetCommitResult = commitOffsets(groupId, invalidMemberId, rebalanceResult.generation, |
| Map(tip -> offset), Some(leaderInstanceId)) |
| assertEquals(Map(tip -> Errors.FENCED_INSTANCE_ID), invalidOffsetCommitResult) |
| } |
| |
| @Test |
| def staticMemberJoinWithUnknownInstanceIdAndKnownMemberId(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, "unknown_instance", |
| protocolType, protocolSuperset, clockAdvance = 1) |
| |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error) |
| } |
| |
| @Test |
| def staticMemberReJoinWithIllegalStateAsUnknownMember(): Unit = { |
| staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| val group = groupCoordinator.groupManager.getGroup(groupId).get |
| group.transitionTo(PreparingRebalance) |
| group.transitionTo(Empty) |
| |
| // Illegal state exception shall trigger since follower id resides in pending member bucket. |
| val expectedException = assertThrows(classOf[IllegalStateException], |
| () => staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)) |
| |
| val message = expectedException.getMessage |
| assertTrue(message.contains(group.groupId)) |
| assertTrue(message.contains(followerInstanceId)) |
| } |
| |
| @Test |
| def testLeaderFailToRejoinBeforeFinalRebalanceTimeoutWithLongSessionTimeout(): Unit = { |
| groupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() |
| |
| timer.advanceClock(DefaultRebalanceTimeout + 1) |
| // The static leader should already session timeout, moving group towards Empty |
| assertEquals(Set.empty, getGroup(groupId).allMembers) |
| assertNull(getGroup(groupId).leaderOrNull) |
| assertEquals(3, getGroup(groupId).generationId) |
| assertGroupState(groupState = Empty) |
| } |
| |
| @Test |
| def testLeaderRejoinBeforeFinalRebalanceTimeoutWithLongSessionTimeout(): Unit = { |
| groupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() |
| |
| // The static leader should be back now, moving group towards CompletingRebalance |
| val leaderRejoinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocols) |
| checkJoinGroupResult(leaderRejoinGroupResult, |
| Errors.NONE, |
| 3, |
| Set(leaderInstanceId), |
| CompletingRebalance, |
| Some(protocolType) |
| ) |
| assertEquals(Set(leaderRejoinGroupResult.memberId), getGroup(groupId).allMembers) |
| assertNotNull(getGroup(groupId).leaderOrNull) |
| assertEquals(3, getGroup(groupId).generationId) |
| } |
| |
| def groupStuckInRebalanceTimeoutDueToNonjoinedStaticMember(): Unit = { |
| val longSessionTimeout = DefaultSessionTimeout * 2 |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, sessionTimeout = longSessionTimeout) |
| |
| val dynamicJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocolSuperset, sessionTimeout = longSessionTimeout) |
| timer.advanceClock(DefaultRebalanceTimeout + 1) |
| |
| val dynamicJoinResult = await(dynamicJoinFuture, 100) |
| // The new dynamic member has been elected as leader |
| assertEquals(dynamicJoinResult.leaderId, dynamicJoinResult.memberId) |
| assertEquals(Errors.NONE, dynamicJoinResult.error) |
| assertEquals(3, dynamicJoinResult.members.size) |
| assertEquals(2, dynamicJoinResult.generationId) |
| assertGroupState(groupState = CompletingRebalance) |
| |
| assertEquals(Set(rebalanceResult.leaderId, rebalanceResult.followerId, |
| dynamicJoinResult.memberId), getGroup(groupId).allMembers) |
| assertEquals(Set(leaderInstanceId, followerInstanceId), |
| getGroup(groupId).allStaticMembers) |
| assertEquals(Set(dynamicJoinResult.memberId), getGroup(groupId).allDynamicMembers) |
| |
| // Send a special leave group request from static follower, moving group towards PreparingRebalance |
| val followerLeaveGroupResults = singleLeaveGroup(groupId, rebalanceResult.followerId) |
| verifyLeaveGroupResult(followerLeaveGroupResults) |
| assertGroupState(groupState = PreparingRebalance) |
| |
| timer.advanceClock(DefaultRebalanceTimeout + 1) |
| // Only static leader is maintained, and group is stuck at PreparingRebalance stage |
| assertTrue(getGroup(groupId).allDynamicMembers.isEmpty) |
| assertEquals(Set(rebalanceResult.leaderId), getGroup(groupId).allMembers) |
| assertTrue(getGroup(groupId).allDynamicMembers.isEmpty) |
| assertEquals(2, getGroup(groupId).generationId) |
| assertGroupState(groupState = PreparingRebalance) |
| } |
| |
| @Test |
| def testStaticMemberFollowerFailToRejoinBeforeRebalanceTimeout(): Unit = { |
| // Increase session timeout so that the follower won't be evicted when rebalance timeout is reached. |
| val initialRebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, sessionTimeout = DefaultRebalanceTimeout * 2) |
| |
| val newMemberInstanceId = "newMember" |
| |
| val leaderId = initialRebalanceResult.leaderId |
| |
| val newMemberJoinGroupFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, |
| protocolSuperset, Some(newMemberInstanceId)) |
| assertGroupState(groupState = PreparingRebalance) |
| |
| val leaderRejoinGroupResult = staticJoinGroup(groupId, leaderId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = DefaultRebalanceTimeout + 1) |
| checkJoinGroupResult(leaderRejoinGroupResult, |
| Errors.NONE, |
| initialRebalanceResult.generation + 1, |
| Set(leaderInstanceId, followerInstanceId, newMemberInstanceId), |
| CompletingRebalance, |
| Some(protocolType), |
| expectedLeaderId = leaderId, |
| expectedMemberId = leaderId) |
| |
| val newMemberJoinGroupResult = Await.result(newMemberJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| assertEquals(Errors.NONE, newMemberJoinGroupResult.error) |
| checkJoinGroupResult(newMemberJoinGroupResult, |
| Errors.NONE, |
| initialRebalanceResult.generation + 1, |
| Set.empty, |
| CompletingRebalance, |
| Some(protocolType), |
| expectedLeaderId = leaderId) |
| } |
| |
| @Test |
| def testStaticMemberLeaderFailToRejoinBeforeRebalanceTimeout(): Unit = { |
| // Increase session timeout so that the leader won't be evicted when rebalance timeout is reached. |
| val initialRebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, sessionTimeout = DefaultRebalanceTimeout * 2) |
| |
| val newMemberInstanceId = "newMember" |
| |
| val newMemberJoinGroupFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, |
| protocolSuperset, Some(newMemberInstanceId)) |
| timer.advanceClock(1) |
| assertGroupState(groupState = PreparingRebalance) |
| |
| val oldFollowerRejoinGroupResult = staticJoinGroup(groupId, initialRebalanceResult.followerId, followerInstanceId, protocolType, protocolSuperset, clockAdvance = DefaultRebalanceTimeout + 1) |
| val newMemberJoinGroupResult = Await.result(newMemberJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) |
| |
| val (newLeaderResult, newFollowerResult) = if (oldFollowerRejoinGroupResult.leaderId == oldFollowerRejoinGroupResult.memberId) |
| (oldFollowerRejoinGroupResult, newMemberJoinGroupResult) |
| else |
| (newMemberJoinGroupResult, oldFollowerRejoinGroupResult) |
| |
| checkJoinGroupResult(newLeaderResult, |
| Errors.NONE, |
| initialRebalanceResult.generation + 1, |
| Set(leaderInstanceId, followerInstanceId, newMemberInstanceId), |
| CompletingRebalance, |
| Some(protocolType)) |
| |
| checkJoinGroupResult(newFollowerResult, |
| Errors.NONE, |
| initialRebalanceResult.generation + 1, |
| Set.empty, |
| CompletingRebalance, |
| Some(protocolType), |
| expectedLeaderId = newLeaderResult.memberId) |
| } |
| |
| @Test |
| def testJoinGroupProtocolTypeIsNotProvidedWhenAnErrorOccurs(): Unit = { |
| // JoinGroup(leader) |
| val leaderResponseFuture = sendJoinGroup(groupId, "fake-id", protocolType, |
| protocolSuperset, Some(leaderInstanceId), DefaultSessionTimeout) |
| |
| // The Protocol Type is None when there is an error |
| val leaderJoinGroupResult = await(leaderResponseFuture, 1) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, leaderJoinGroupResult.error) |
| assertEquals(None, leaderJoinGroupResult.protocolType) |
| } |
| |
| @Test |
| def testJoinGroupReturnsTheProtocolType(): Unit = { |
| // JoinGroup(leader) |
| val leaderResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, |
| protocolSuperset, Some(leaderInstanceId), DefaultSessionTimeout) |
| |
| // JoinGroup(follower) |
| val followerResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, |
| protocolSuperset, Some(followerInstanceId), DefaultSessionTimeout) |
| |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| timer.advanceClock(DefaultRebalanceTimeout + 1) |
| |
| // The Protocol Type is Defined when there is not error |
| val leaderJoinGroupResult = await(leaderResponseFuture, 1) |
| assertEquals(Errors.NONE, leaderJoinGroupResult.error) |
| assertEquals(protocolType, leaderJoinGroupResult.protocolType.orNull) |
| |
| // The Protocol Type is Defined when there is not error |
| val followerJoinGroupResult = await(followerResponseFuture, 1) |
| assertEquals(Errors.NONE, followerJoinGroupResult.error) |
| assertEquals(protocolType, followerJoinGroupResult.protocolType.orNull) |
| } |
| |
| @Test |
| def testSyncGroupReturnsAnErrorWhenProtocolTypeIsInconsistent(): Unit = { |
| testSyncGroupProtocolTypeAndNameWith(Some("whatever"), None, Errors.INCONSISTENT_GROUP_PROTOCOL, |
| None, None) |
| } |
| |
| @Test |
| def testSyncGroupReturnsAnErrorWhenProtocolNameIsInconsistent(): Unit = { |
| testSyncGroupProtocolTypeAndNameWith(None, Some("whatever"), Errors.INCONSISTENT_GROUP_PROTOCOL, |
| None, None) |
| } |
| |
| @Test |
| def testSyncGroupSucceedWhenProtocolTypeAndNameAreNotProvided(): Unit = { |
| testSyncGroupProtocolTypeAndNameWith(None, None, Errors.NONE, |
| Some(protocolType), Some(protocolName)) |
| } |
| |
| @Test |
| def testSyncGroupSucceedWhenProtocolTypeAndNameAreConsistent(): Unit = { |
| testSyncGroupProtocolTypeAndNameWith(Some(protocolType), Some(protocolName), |
| Errors.NONE, Some(protocolType), Some(protocolName)) |
| } |
| |
| private def testSyncGroupProtocolTypeAndNameWith(protocolType: Option[String], |
| protocolName: Option[String], |
| expectedError: Errors, |
| expectedProtocolType: Option[String], |
| expectedProtocolName: Option[String]): Unit = { |
| // JoinGroup(leader) with the Protocol Type of the group |
| val leaderResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, this.protocolType, |
| protocolSuperset, Some(leaderInstanceId), DefaultSessionTimeout) |
| |
| // JoinGroup(follower) with the Protocol Type of the group |
| val followerResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, this.protocolType, |
| protocolSuperset, Some(followerInstanceId), DefaultSessionTimeout) |
| |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| timer.advanceClock(DefaultRebalanceTimeout + 1) |
| |
| val leaderJoinGroupResult = await(leaderResponseFuture, 1) |
| val leaderId = leaderJoinGroupResult.memberId |
| val generationId = leaderJoinGroupResult.generationId |
| val followerJoinGroupResult = await(followerResponseFuture, 1) |
| val followerId = followerJoinGroupResult.memberId |
| |
| // SyncGroup with the provided Protocol Type and Name |
| val leaderSyncGroupResult = syncGroupLeader(groupId, generationId, leaderId, |
| Map(leaderId -> Array.empty), protocolType, protocolName) |
| assertEquals(expectedError, leaderSyncGroupResult.error) |
| assertEquals(expectedProtocolType, leaderSyncGroupResult.protocolType) |
| assertEquals(expectedProtocolName, leaderSyncGroupResult.protocolName) |
| |
| // SyncGroup with the provided Protocol Type and Name |
| val followerSyncGroupResult = syncGroupFollower(groupId, generationId, followerId, |
| protocolType, protocolName) |
| assertEquals(expectedError, followerSyncGroupResult.error) |
| assertEquals(expectedProtocolType, followerSyncGroupResult.protocolType) |
| assertEquals(expectedProtocolName, followerSyncGroupResult.protocolName) |
| } |
| |
| private class RebalanceResult(val generation: Int, |
| val leaderId: String, |
| val leaderAssignment: Array[Byte], |
| val followerId: String, |
| val followerAssignment: Array[Byte]) |
| /** |
| * Generate static member rebalance results, including: |
| * - generation |
| * - leader id |
| * - leader assignment |
| * - follower id |
| * - follower assignment |
| */ |
| private def staticMembersJoinAndRebalance(leaderInstanceId: String, |
| followerInstanceId: String, |
| sessionTimeout: Int = DefaultSessionTimeout, |
| rebalanceTimeout: Int = DefaultRebalanceTimeout): RebalanceResult = { |
| val leaderResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, |
| protocolSuperset, Some(leaderInstanceId), sessionTimeout, rebalanceTimeout) |
| |
| val followerResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, |
| protocolSuperset, Some(followerInstanceId), sessionTimeout, rebalanceTimeout) |
| // The goal for two timer advance is to let first group initial join complete and set newMemberAdded flag to false. Next advance is |
| // to trigger the rebalance as needed for follower delayed join. One large time advance won't help because we could only populate one |
| // delayed join from purgatory and the new delayed op is created at that time and never be triggered. |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| timer.advanceClock(DefaultRebalanceTimeout + 1) |
| val newGeneration = 1 |
| |
| val leaderJoinGroupResult = await(leaderResponseFuture, 1) |
| assertEquals(Errors.NONE, leaderJoinGroupResult.error) |
| assertEquals(newGeneration, leaderJoinGroupResult.generationId) |
| |
| val followerJoinGroupResult = await(followerResponseFuture, 1) |
| assertEquals(Errors.NONE, followerJoinGroupResult.error) |
| assertEquals(newGeneration, followerJoinGroupResult.generationId) |
| |
| val leaderId = leaderJoinGroupResult.memberId |
| val leaderSyncGroupResult = syncGroupLeader(groupId, leaderJoinGroupResult.generationId, leaderId, Map(leaderId -> Array[Byte]())) |
| assertEquals(Errors.NONE, leaderSyncGroupResult.error) |
| assertTrue(getGroup(groupId).is(Stable)) |
| |
| val followerId = followerJoinGroupResult.memberId |
| val followerSyncGroupResult = syncGroupFollower(groupId, leaderJoinGroupResult.generationId, followerId) |
| assertEquals(Errors.NONE, followerSyncGroupResult.error) |
| assertTrue(getGroup(groupId).is(Stable)) |
| |
| new RebalanceResult(newGeneration, |
| leaderId, |
| leaderSyncGroupResult.memberAssignment, |
| followerId, |
| followerSyncGroupResult.memberAssignment) |
| } |
| |
| private def checkJoinGroupResult(joinGroupResult: JoinGroupResult, |
| expectedError: Errors, |
| expectedGeneration: Int, |
| expectedGroupInstanceIds: Set[String], |
| expectedGroupState: GroupState, |
| expectedProtocolType: Option[String], |
| expectedLeaderId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID, |
| expectedMemberId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID, |
| expectedSkipAssignment: Boolean = false): Unit = { |
| assertEquals(expectedError, joinGroupResult.error) |
| assertEquals(expectedGeneration, joinGroupResult.generationId) |
| assertEquals(expectedGroupInstanceIds.size, joinGroupResult.members.size) |
| val resultedGroupInstanceIds = joinGroupResult.members.map(member => member.groupInstanceId).toSet |
| assertEquals(expectedGroupInstanceIds, resultedGroupInstanceIds) |
| assertGroupState(groupState = expectedGroupState) |
| assertEquals(expectedProtocolType, joinGroupResult.protocolType) |
| assertEquals(expectedSkipAssignment, joinGroupResult.skipAssignment) |
| |
| if (!expectedLeaderId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID)) { |
| assertEquals(expectedLeaderId, joinGroupResult.leaderId) |
| } |
| if (!expectedMemberId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID)) { |
| assertEquals(expectedMemberId, joinGroupResult.memberId) |
| } |
| } |
| |
| @Test |
| def testHeartbeatWrongCoordinator(): Unit = { |
| val heartbeatResult = heartbeat(otherGroupId, memberId, -1) |
| assertEquals(Errors.NOT_COORDINATOR, heartbeatResult) |
| } |
| |
| @Test |
| def testHeartbeatUnknownGroup(): Unit = { |
| val heartbeatResult = heartbeat(groupId, memberId, -1) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) |
| } |
| |
| @Test |
| def testHeartbeatDeadGroup(): Unit = { |
| val memberId = "memberId" |
| |
| val deadGroupId = "deadGroupId" |
| |
| groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime())) |
| val heartbeatResult = heartbeat(deadGroupId, memberId, 1) |
| assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, heartbeatResult) |
| } |
| |
| @Test |
| def testHeartbeatEmptyGroup(): Unit = { |
| val memberId = "memberId" |
| |
| val group = new GroupMetadata(groupId, Empty, new MockTime()) |
| val member = new MemberMetadata(memberId, Some(groupInstanceId), |
| ClientId, ClientHost, DefaultRebalanceTimeout, DefaultSessionTimeout, |
| protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) |
| |
| group.add(member) |
| groupCoordinator.groupManager.addGroup(group) |
| val heartbeatResult = heartbeat(groupId, memberId, 0) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) |
| } |
| |
| @Test |
| def testHeartbeatUnknownConsumerExistingGroup(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val otherMemberId = "memberId" |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val heartbeatResult = heartbeat(groupId, otherMemberId, 1) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) |
| } |
| |
| @Test |
| def testHeartbeatRebalanceInProgress(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val heartbeatResult = heartbeat(groupId, assignedMemberId, 1) |
| assertEquals(Errors.NONE, heartbeatResult) |
| } |
| |
| @Test |
| def testHeartbeatIllegalGeneration(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val heartbeatResult = heartbeat(groupId, assignedMemberId, 2) |
| assertEquals(Errors.ILLEGAL_GENERATION, heartbeatResult) |
| } |
| |
| @Test |
| def testValidHeartbeat(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedConsumerId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) |
| assertEquals(Errors.NONE, heartbeatResult) |
| } |
| |
| @Test |
| def testSessionTimeout(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedConsumerId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| when(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))) |
| .thenReturn(HostedPartition.None) |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) |
| |
| timer.advanceClock(DefaultSessionTimeout + 100) |
| |
| val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) |
| } |
| |
| @Test |
| def testHeartbeatMaintainsSession(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val sessionTimeout = 1000 |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols, |
| rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout) |
| val assignedConsumerId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| timer.advanceClock(sessionTimeout / 2) |
| |
| var heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) |
| assertEquals(Errors.NONE, heartbeatResult) |
| |
| timer.advanceClock(sessionTimeout / 2 + 100) |
| |
| heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) |
| assertEquals(Errors.NONE, heartbeatResult) |
| } |
| |
| @Test |
| def testCommitMaintainsSession(): Unit = { |
| val sessionTimeout = 1000 |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols, |
| rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout) |
| val assignedMemberId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| timer.advanceClock(sessionTimeout / 2) |
| |
| val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| |
| timer.advanceClock(sessionTimeout / 2 + 100) |
| |
| val heartbeatResult = heartbeat(groupId, assignedMemberId, 1) |
| assertEquals(Errors.NONE, heartbeatResult) |
| } |
| |
| @Test |
| def testSessionTimeoutDuringRebalance(): Unit = { |
| // create a group with a single member |
| val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, |
| rebalanceTimeout = 2000, sessionTimeout = 1000) |
| val firstMemberId = firstJoinResult.memberId |
| val firstGenerationId = firstJoinResult.generationId |
| assertEquals(firstMemberId, firstJoinResult.leaderId) |
| assertEquals(Errors.NONE, firstJoinResult.error) |
| |
| val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, firstSyncResult.error) |
| |
| // now have a new member join to trigger a rebalance |
| val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| |
| timer.advanceClock(500) |
| |
| var heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) |
| assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) |
| |
| // letting the session expire should make the member fall out of the group |
| timer.advanceClock(1100) |
| |
| heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) |
| |
| // and the rebalance should complete with only the new member |
| val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) |
| assertEquals(Errors.NONE, otherJoinResult.error) |
| } |
| |
| @Test |
| def testRebalanceCompletesBeforeMemberJoins(): Unit = { |
| // create a group with a single member |
| val firstJoinResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocols, |
| rebalanceTimeout = 1200, sessionTimeout = 1000) |
| val firstMemberId = firstJoinResult.memberId |
| val firstGenerationId = firstJoinResult.generationId |
| assertEquals(firstMemberId, firstJoinResult.leaderId) |
| assertEquals(Errors.NONE, firstJoinResult.error) |
| |
| val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, firstSyncResult.error) |
| |
| // now have a new member join to trigger a rebalance |
| val otherMemberSessionTimeout = DefaultSessionTimeout |
| val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| |
| // send a couple heartbeats to keep the member alive while the rebalance finishes |
| var expectedResultList = List(Errors.REBALANCE_IN_PROGRESS, Errors.REBALANCE_IN_PROGRESS) |
| for (expectedResult <- expectedResultList) { |
| timer.advanceClock(otherMemberSessionTimeout) |
| val heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) |
| assertEquals(expectedResult, heartbeatResult) |
| } |
| |
| // now timeout the rebalance |
| timer.advanceClock(otherMemberSessionTimeout) |
| val otherJoinResult = await(otherJoinFuture, otherMemberSessionTimeout+100) |
| val otherMemberId = otherJoinResult.memberId |
| val otherGenerationId = otherJoinResult.generationId |
| val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncResult.error) |
| |
| // the unjoined static member should be remained in the group before session timeout. |
| assertEquals(Errors.NONE, otherJoinResult.error) |
| var heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) |
| assertEquals(Errors.ILLEGAL_GENERATION, heartbeatResult) |
| |
| expectedResultList = List(Errors.NONE, Errors.NONE, Errors.REBALANCE_IN_PROGRESS) |
| |
| // now session timeout the unjoined member. Still keeping the new member. |
| for (expectedResult <- expectedResultList) { |
| timer.advanceClock(otherMemberSessionTimeout) |
| heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId) |
| assertEquals(expectedResult, heartbeatResult) |
| } |
| |
| val otherRejoinGroupFuture = sendJoinGroup(groupId, otherMemberId, protocolType, protocols) |
| val otherReJoinResult = await(otherRejoinGroupFuture, otherMemberSessionTimeout+100) |
| assertEquals(Errors.NONE, otherReJoinResult.error) |
| |
| val otherRejoinGenerationId = otherReJoinResult.generationId |
| val reSyncResult = syncGroupLeader(groupId, otherRejoinGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, reSyncResult.error) |
| |
| // the joined member should get heart beat response with no error. Let the new member keep heartbeating for a while |
| // to verify that no new rebalance is triggered unexpectedly |
| for ( _ <- 1 to 20) { |
| timer.advanceClock(500) |
| heartbeatResult = heartbeat(groupId, otherMemberId, otherRejoinGenerationId) |
| assertEquals(Errors.NONE, heartbeatResult) |
| } |
| } |
| |
| @Test |
| def testSyncGroupEmptyAssignment(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedConsumerId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map()) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| assertTrue(syncGroupResult.memberAssignment.isEmpty) |
| |
| val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) |
| assertEquals(Errors.NONE, heartbeatResult) |
| } |
| |
| @Test |
| def testSyncGroupNotCoordinator(): Unit = { |
| val generation = 1 |
| |
| val syncGroupResult = syncGroupFollower(otherGroupId, generation, memberId) |
| assertEquals(Errors.NOT_COORDINATOR, syncGroupResult.error) |
| } |
| |
| @Test |
| def testSyncGroupFromUnknownGroup(): Unit = { |
| val syncGroupResult = syncGroupFollower(groupId, 1, memberId) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupResult.error) |
| } |
| |
| @Test |
| def testSyncGroupFromUnknownMember(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedConsumerId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) |
| val syncGroupError = syncGroupResult.error |
| assertEquals(Errors.NONE, syncGroupError) |
| |
| val unknownMemberId = "blah" |
| val unknownMemberSyncResult = syncGroupFollower(groupId, generationId, unknownMemberId) |
| assertEquals(Errors.UNKNOWN_MEMBER_ID, unknownMemberSyncResult.error) |
| } |
| |
| @Test |
| def testSyncGroupFromIllegalGeneration(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedConsumerId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| // send the sync group with an invalid generation |
| val syncGroupResult = syncGroupLeader(groupId, generationId+1, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) |
| assertEquals(Errors.ILLEGAL_GENERATION, syncGroupResult.error) |
| } |
| |
| @Test |
| def testJoinGroupFromUnchangedFollowerDoesNotRebalance(): Unit = { |
| // to get a group of two members: |
| // 1. join and sync with a single member (because we can't immediately join with two members) |
| // 2. join and sync with the first member and a new member |
| |
| val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val firstMemberId = firstJoinResult.memberId |
| val firstGenerationId = firstJoinResult.generationId |
| assertEquals(firstMemberId, firstJoinResult.leaderId) |
| assertEquals(Errors.NONE, firstJoinResult.error) |
| |
| val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, firstSyncResult.error) |
| |
| val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| |
| val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols) |
| |
| val joinResult = await(joinFuture, DefaultSessionTimeout+100) |
| val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) |
| assertEquals(Errors.NONE, joinResult.error) |
| assertEquals(Errors.NONE, otherJoinResult.error) |
| assertTrue(joinResult.generationId == otherJoinResult.generationId) |
| |
| assertEquals(firstMemberId, joinResult.leaderId) |
| assertEquals(firstMemberId, otherJoinResult.leaderId) |
| |
| val nextGenerationId = joinResult.generationId |
| |
| // this shouldn't cause a rebalance since protocol information hasn't changed |
| val followerJoinResult = await(sendJoinGroup(groupId, otherJoinResult.memberId, protocolType, protocols), 1) |
| |
| assertEquals(Errors.NONE, followerJoinResult.error) |
| assertEquals(nextGenerationId, followerJoinResult.generationId) |
| } |
| |
| @Test |
| def testJoinGroupFromUnchangedLeaderShouldRebalance(): Unit = { |
| val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val firstMemberId = firstJoinResult.memberId |
| val firstGenerationId = firstJoinResult.generationId |
| assertEquals(firstMemberId, firstJoinResult.leaderId) |
| assertEquals(Errors.NONE, firstJoinResult.error) |
| |
| val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, firstSyncResult.error) |
| |
| // join groups from the leader should force the group to rebalance, which allows the |
| // leader to push new assignments when local metadata changes |
| |
| val secondJoinResult = await(sendJoinGroup(groupId, firstMemberId, protocolType, protocols), 1) |
| |
| assertEquals(Errors.NONE, secondJoinResult.error) |
| assertNotEquals(firstGenerationId, secondJoinResult.generationId) |
| } |
| |
| /** |
| * Test if the following scenario completes a rebalance correctly: A new member starts a JoinGroup request with |
| * an UNKNOWN_MEMBER_ID, attempting to join a stable group. But never initiates the second JoinGroup request with |
| * the provided member ID and times out. The test checks if original member remains the sole member in this group, |
| * which should remain stable throughout this test. |
| */ |
| @Test |
| def testSecondMemberPartiallyJoinAndTimeout(): Unit = { |
| val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val firstMemberId = firstJoinResult.memberId |
| val firstGenerationId = firstJoinResult.generationId |
| assertEquals(firstMemberId, firstJoinResult.leaderId) |
| assertEquals(Errors.NONE, firstJoinResult.error) |
| |
| // Starting sync group leader |
| val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, firstSyncResult.error) |
| timer.advanceClock(100) |
| assertEquals(Set(firstMemberId), groupCoordinator.groupManager.getGroup(groupId).get.allMembers) |
| assertEquals(groupCoordinator.groupManager.getGroup(groupId).get.allMembers, |
| groupCoordinator.groupManager.getGroup(groupId).get.allDynamicMembers) |
| assertEquals(0, groupCoordinator.groupManager.getGroup(groupId).get.numPending) |
| val group = groupCoordinator.groupManager.getGroup(groupId).get |
| |
| // ensure the group is stable before a new member initiates join request |
| assertEquals(Stable, group.currentState) |
| |
| // new member initiates join group |
| val secondJoinResult = joinGroupPartial(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| assertEquals(Errors.MEMBER_ID_REQUIRED, secondJoinResult.error) |
| assertEquals(1, group.numPending) |
| assertEquals(Stable, group.currentState) |
| |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) |
| |
| // advance clock to timeout the pending member |
| assertEquals(Set(firstMemberId), group.allMembers) |
| assertEquals(1, group.numPending) |
| timer.advanceClock(300) |
| |
| // original (firstMember) member sends heartbeats to prevent session timeouts. |
| val heartbeatResult = heartbeat(groupId, firstMemberId, 1) |
| assertEquals(Errors.NONE, heartbeatResult) |
| |
| // timeout the pending member |
| timer.advanceClock(300) |
| |
| // at this point the second member should have been removed from pending list (session timeout), |
| // and the group should be in Stable state with only the first member in it. |
| assertEquals(Set(firstMemberId), group.allMembers) |
| assertEquals(0, group.numPending) |
| assertEquals(Stable, group.currentState) |
| assertTrue(group.has(firstMemberId)) |
| } |
| |
| /** |
| * Create a group with two members in Stable state. Create a third pending member by completing it's first JoinGroup |
| * request without a member id. |
| */ |
| private def setupGroupWithPendingMember(): JoinGroupResult = { |
| // add the first member |
| val joinResult1 = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| assertGroupState(groupState = CompletingRebalance) |
| |
| // now the group is stable, with the one member that joined above |
| val firstSyncResult = syncGroupLeader(groupId, joinResult1.generationId, joinResult1.memberId, Map(joinResult1.memberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, firstSyncResult.error) |
| assertGroupState(groupState = Stable) |
| |
| // start the join for the second member |
| val secondJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| |
| // rejoin the first member back into the group |
| val firstJoinFuture = sendJoinGroup(groupId, joinResult1.memberId, protocolType, protocols) |
| val firstMemberJoinResult = await(firstJoinFuture, DefaultSessionTimeout+100) |
| val secondMemberJoinResult = await(secondJoinFuture, DefaultSessionTimeout+100) |
| assertGroupState(groupState = CompletingRebalance) |
| |
| // stabilize the group |
| val secondSyncResult = syncGroupLeader(groupId, firstMemberJoinResult.generationId, joinResult1.memberId, Map(joinResult1.memberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, secondSyncResult.error) |
| assertGroupState(groupState = Stable) |
| |
| // re-join an existing member, to transition the group to PreparingRebalance state. |
| sendJoinGroup(groupId, firstMemberJoinResult.memberId, protocolType, protocols) |
| assertGroupState(groupState = PreparingRebalance) |
| |
| // create a pending member in the group |
| val pendingMember = joinGroupPartial(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, sessionTimeout=100) |
| assertEquals(1, groupCoordinator.groupManager.getGroup(groupId).get.numPending) |
| |
| // re-join the second existing member |
| sendJoinGroup(groupId, secondMemberJoinResult.memberId, protocolType, protocols) |
| assertGroupState(groupState = PreparingRebalance) |
| assertEquals(1, groupCoordinator.groupManager.getGroup(groupId).get.numPending) |
| |
| pendingMember |
| } |
| |
| /** |
| * Setup a group in with a pending member. The test checks if the a pending member joining completes the rebalancing |
| * operation |
| */ |
| @Test |
| def testJoinGroupCompletionWhenPendingMemberJoins(): Unit = { |
| val pendingMember = setupGroupWithPendingMember() |
| |
| // compete join group for the pending member |
| val pendingMemberJoinFuture = sendJoinGroup(groupId, pendingMember.memberId, protocolType, protocols) |
| await(pendingMemberJoinFuture, DefaultSessionTimeout+100) |
| |
| assertGroupState(groupState = CompletingRebalance) |
| assertEquals(3, group().allMembers.size) |
| assertEquals(0, group().numPending) |
| } |
| |
| /** |
| * Setup a group in with a pending member. The test checks if the timeout of the pending member will |
| * cause the group to return to a CompletingRebalance state. |
| */ |
| @Test |
| def testJoinGroupCompletionWhenPendingMemberTimesOut(): Unit = { |
| setupGroupWithPendingMember() |
| |
| // Advancing Clock by > 100 (session timeout for third and fourth member) |
| // and < 500 (for first and second members). This will force the coordinator to attempt join |
| // completion on heartbeat expiration (since we are in PendingRebalance stage). |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) |
| timer.advanceClock(120) |
| |
| assertGroupState(groupState = CompletingRebalance) |
| assertEquals(2, group().allMembers.size) |
| assertEquals(0, group().numPending) |
| } |
| |
| @Test |
| def testPendingMembersLeavesGroup(): Unit = { |
| val pending = setupGroupWithPendingMember() |
| |
| val leaveGroupResults = singleLeaveGroup(groupId, pending.memberId) |
| verifyLeaveGroupResult(leaveGroupResults) |
| |
| assertGroupState(groupState = CompletingRebalance) |
| assertEquals(2, group().allMembers.size) |
| assertEquals(2, group().allDynamicMembers.size) |
| assertEquals(0, group().numPending) |
| } |
| |
| private def verifyHeartbeat( |
| joinGroupResult: JoinGroupResult, |
| expectedError: Errors |
| ): Unit = { |
| val heartbeatResult = heartbeat( |
| groupId, |
| joinGroupResult.memberId, |
| joinGroupResult.generationId |
| ) |
| assertEquals(expectedError, heartbeatResult) |
| } |
| |
| private def joinWithNMembers(nbMembers: Int): Seq[JoinGroupResult] = { |
| val requiredKnownMemberId = true |
| |
| // First JoinRequests |
| var futures = 1.to(nbMembers).map { _ => |
| sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, |
| None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) |
| } |
| |
| // Get back the assigned member ids |
| val memberIds = futures.map(await(_, 1).memberId) |
| |
| // Second JoinRequests |
| futures = memberIds.map { memberId => |
| sendJoinGroup(groupId, memberId, protocolType, protocols, |
| None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) |
| } |
| |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| timer.advanceClock(DefaultRebalanceTimeout + 1) |
| |
| futures.map(await(_, 1)) |
| } |
| |
| @Test |
| def testRebalanceTimesOutWhenSyncRequestIsNotReceived(): Unit = { |
| // This test case ensure that the DelayedSync does kick out all members |
| // if they don't sent a sync request before the rebalance timeout. The |
| // group is in the Stable state in this case. |
| val results = joinWithNMembers(nbMembers = 3) |
| assertEquals(Set(Errors.NONE), results.map(_.error).toSet) |
| |
| // Advance time |
| timer.advanceClock(DefaultRebalanceTimeout / 2) |
| |
| // Heartbeats to ensure that heartbeating does not interfere with the |
| // delayed sync operation. |
| results.foreach { joinGroupResult => |
| verifyHeartbeat(joinGroupResult, Errors.NONE) |
| } |
| |
| // Advance part the rebalance timeout to trigger the delayed operation. |
| when(replicaManager.getMagic(any[TopicPartition])) |
| .thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) |
| |
| timer.advanceClock(DefaultRebalanceTimeout / 2 + 1) |
| |
| // Heartbeats fail because none of the members have sent the sync request |
| results.foreach { joinGroupResult => |
| verifyHeartbeat(joinGroupResult, Errors.UNKNOWN_MEMBER_ID) |
| } |
| } |
| |
| @Test |
| def testRebalanceTimesOutWhenSyncRequestIsNotReceivedFromFollowers(): Unit = { |
| // This test case ensure that the DelayedSync does kick out the followers |
| // if they don't sent a sync request before the rebalance timeout. The |
| // group is in the Stable state in this case. |
| val results = joinWithNMembers(nbMembers = 3) |
| assertEquals(Set(Errors.NONE), results.map(_.error).toSet) |
| |
| // Advance time |
| timer.advanceClock(DefaultRebalanceTimeout / 2) |
| |
| // Heartbeats to ensure that heartbeating does not interfere with the |
| // delayed sync operation. |
| results.foreach { joinGroupResult => |
| verifyHeartbeat(joinGroupResult, Errors.NONE) |
| } |
| |
| // Leader sends Sync |
| val assignments = results.map(result => result.memberId -> Array.empty[Byte]).toMap |
| val leaderResult = sendSyncGroupLeader(groupId, results.head.generationId, results.head.memberId, |
| Some(protocolType), Some(protocolName), None, assignments) |
| |
| assertEquals(Errors.NONE, await(leaderResult, 1).error) |
| |
| // Leader should be able to heartbeart |
| verifyHeartbeat(results.head, Errors.NONE) |
| |
| // Advance part the rebalance timeout to trigger the delayed operation. |
| timer.advanceClock(DefaultRebalanceTimeout / 2 + 1) |
| |
| // Leader should be able to heartbeart |
| verifyHeartbeat(results.head, Errors.REBALANCE_IN_PROGRESS) |
| |
| // Followers should have been removed. |
| results.tail.foreach { joinGroupResult => |
| verifyHeartbeat(joinGroupResult, Errors.UNKNOWN_MEMBER_ID) |
| } |
| } |
| |
| @Test |
| def testRebalanceTimesOutWhenSyncRequestIsNotReceivedFromLeaders(): Unit = { |
| // This test case ensure that the DelayedSync does kick out the leader |
| // if it does not sent a sync request before the rebalance timeout. The |
| // group is in the CompletingRebalance state in this case. |
| val results = joinWithNMembers(nbMembers = 3) |
| assertEquals(Set(Errors.NONE), results.map(_.error).toSet) |
| |
| // Advance time |
| timer.advanceClock(DefaultRebalanceTimeout / 2) |
| |
| // Heartbeats to ensure that heartbeating does not interfere with the |
| // delayed sync operation. |
| results.foreach { joinGroupResult => |
| verifyHeartbeat(joinGroupResult, Errors.NONE) |
| } |
| |
| // Followers send Sync |
| val followerResults = results.tail.map { joinGroupResult => |
| sendSyncGroupFollower(groupId, joinGroupResult.generationId, joinGroupResult.memberId, |
| Some(protocolType), Some(protocolName), None) |
| } |
| |
| // Advance part the rebalance timeout to trigger the delayed operation. |
| timer.advanceClock(DefaultRebalanceTimeout / 2 + 1) |
| |
| val followerErrors = followerResults.map(await(_, 1).error) |
| assertEquals(Set(Errors.REBALANCE_IN_PROGRESS), followerErrors.toSet) |
| |
| // Leader should have been removed. |
| verifyHeartbeat(results.head, Errors.UNKNOWN_MEMBER_ID) |
| |
| // Followers should be able to heartbeat. |
| results.tail.foreach { joinGroupResult => |
| verifyHeartbeat(joinGroupResult, Errors.REBALANCE_IN_PROGRESS) |
| } |
| } |
| |
| @Test |
| def testRebalanceDoesNotTimeOutWhenAllSyncAreReceived(): Unit = { |
| // This test case ensure that the DelayedSync does not kick any |
| // members out when they have all sent their sync requests. |
| val results = joinWithNMembers(nbMembers = 3) |
| assertEquals(Set(Errors.NONE), results.map(_.error).toSet) |
| |
| // Advance time |
| timer.advanceClock(DefaultRebalanceTimeout / 2) |
| |
| // Heartbeats to ensure that heartbeating does not interfere with the |
| // delayed sync operation. |
| results.foreach { joinGroupResult => |
| verifyHeartbeat(joinGroupResult, Errors.NONE) |
| } |
| |
| val assignments = results.map(result => result.memberId -> Array.empty[Byte]).toMap |
| val leaderResult = sendSyncGroupLeader(groupId, results.head.generationId, results.head.memberId, |
| Some(protocolType), Some(protocolName), None, assignments) |
| |
| assertEquals(Errors.NONE, await(leaderResult, 1).error) |
| |
| // Followers send Sync |
| val followerResults = results.tail.map { joinGroupResult => |
| sendSyncGroupFollower(groupId, joinGroupResult.generationId, joinGroupResult.memberId, |
| Some(protocolType), Some(protocolName), None) |
| } |
| |
| val followerErrors = followerResults.map(await(_, 1).error) |
| assertEquals(Set(Errors.NONE), followerErrors.toSet) |
| |
| // Advance past the rebalance timeout to expire the Sync timout. All |
| // members should remain and the group should not rebalance. |
| timer.advanceClock(DefaultRebalanceTimeout / 2 + 1) |
| |
| // Followers should be able to heartbeat. |
| results.foreach { joinGroupResult => |
| verifyHeartbeat(joinGroupResult, Errors.NONE) |
| } |
| |
| // Advance a bit more. |
| timer.advanceClock(DefaultRebalanceTimeout / 2) |
| |
| // Followers should be able to heartbeat. |
| results.foreach { joinGroupResult => |
| verifyHeartbeat(joinGroupResult, Errors.NONE) |
| } |
| } |
| |
| private def group(groupId: String = groupId) = { |
| groupCoordinator.groupManager.getGroup(groupId) match { |
| case Some(g) => g |
| case None => null |
| } |
| } |
| |
| private def assertGroupState(groupId: String = groupId, |
| groupState: GroupState): Unit = { |
| groupCoordinator.groupManager.getGroup(groupId) match { |
| case Some(group) => assertEquals(groupState, group.currentState) |
| case None => fail(s"Group $groupId not found in coordinator") |
| } |
| } |
| |
| private def joinGroupPartial(groupId: String, |
| memberId: String, |
| protocolType: String, |
| protocols: List[(String, Array[Byte])], |
| sessionTimeout: Int = DefaultSessionTimeout, |
| rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = { |
| val requireKnownMemberId = true |
| val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, None, sessionTimeout, rebalanceTimeout, requireKnownMemberId) |
| Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS)) |
| } |
| |
| @Test |
| def testLeaderFailureInSyncGroup(): Unit = { |
| // to get a group of two members: |
| // 1. join and sync with a single member (because we can't immediately join with two members) |
| // 2. join and sync with the first member and a new member |
| |
| val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val firstMemberId = firstJoinResult.memberId |
| val firstGenerationId = firstJoinResult.generationId |
| assertEquals(firstMemberId, firstJoinResult.leaderId) |
| assertEquals(Errors.NONE, firstJoinResult.error) |
| |
| val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, firstSyncResult.error) |
| |
| val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| |
| val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols) |
| |
| val joinResult = await(joinFuture, DefaultSessionTimeout+100) |
| val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) |
| assertEquals(Errors.NONE, joinResult.error) |
| assertEquals(Errors.NONE, otherJoinResult.error) |
| assertTrue(joinResult.generationId == otherJoinResult.generationId) |
| |
| assertEquals(firstMemberId, joinResult.leaderId) |
| assertEquals(firstMemberId, otherJoinResult.leaderId) |
| |
| val nextGenerationId = joinResult.generationId |
| |
| // with no leader SyncGroup, the follower's request should fail with an error indicating |
| // that it should rejoin |
| val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId, None, None, None) |
| |
| timer.advanceClock(DefaultSessionTimeout + 100) |
| |
| val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100) |
| assertEquals(Errors.REBALANCE_IN_PROGRESS, followerSyncResult.error) |
| } |
| |
| @Test |
| def testSyncGroupFollowerAfterLeader(): Unit = { |
| // to get a group of two members: |
| // 1. join and sync with a single member (because we can't immediately join with two members) |
| // 2. join and sync with the first member and a new member |
| |
| val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val firstMemberId = firstJoinResult.memberId |
| val firstGenerationId = firstJoinResult.generationId |
| assertEquals(firstMemberId, firstJoinResult.leaderId) |
| assertEquals(Errors.NONE, firstJoinResult.error) |
| |
| val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, firstSyncResult.error) |
| |
| val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| |
| val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols) |
| |
| val joinResult = await(joinFuture, DefaultSessionTimeout+100) |
| val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) |
| assertEquals(Errors.NONE, joinResult.error) |
| assertEquals(Errors.NONE, otherJoinResult.error) |
| assertTrue(joinResult.generationId == otherJoinResult.generationId) |
| |
| assertEquals(firstMemberId, joinResult.leaderId) |
| assertEquals(firstMemberId, otherJoinResult.leaderId) |
| |
| val nextGenerationId = joinResult.generationId |
| val leaderId = firstMemberId |
| val leaderAssignment = Array[Byte](0) |
| val followerId = otherJoinResult.memberId |
| val followerAssignment = Array[Byte](1) |
| |
| val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId, |
| Map(leaderId -> leaderAssignment, followerId -> followerAssignment)) |
| assertEquals(Errors.NONE, leaderSyncResult.error) |
| assertEquals(leaderAssignment, leaderSyncResult.memberAssignment) |
| |
| val followerSyncResult = syncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId) |
| assertEquals(Errors.NONE, followerSyncResult.error) |
| assertEquals(followerAssignment, followerSyncResult.memberAssignment) |
| } |
| |
| @Test |
| def testSyncGroupLeaderAfterFollower(): Unit = { |
| // to get a group of two members: |
| // 1. join and sync with a single member (because we can't immediately join with two members) |
| // 2. join and sync with the first member and a new member |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val firstMemberId = joinGroupResult.memberId |
| val firstGenerationId = joinGroupResult.generationId |
| assertEquals(firstMemberId, joinGroupResult.leaderId) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val syncGroupResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| |
| val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols) |
| |
| val joinResult = await(joinFuture, DefaultSessionTimeout+100) |
| val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) |
| assertEquals(Errors.NONE, joinResult.error) |
| assertEquals(Errors.NONE, otherJoinResult.error) |
| assertTrue(joinResult.generationId == otherJoinResult.generationId) |
| |
| val nextGenerationId = joinResult.generationId |
| val leaderId = joinResult.leaderId |
| val leaderAssignment = Array[Byte](0) |
| val followerId = otherJoinResult.memberId |
| val followerAssignment = Array[Byte](1) |
| |
| assertEquals(firstMemberId, joinResult.leaderId) |
| assertEquals(firstMemberId, otherJoinResult.leaderId) |
| |
| val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId, None, None, None) |
| |
| val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId, |
| Map(leaderId -> leaderAssignment, followerId -> followerAssignment)) |
| assertEquals(Errors.NONE, leaderSyncResult.error) |
| assertEquals(leaderAssignment, leaderSyncResult.memberAssignment) |
| |
| val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100) |
| assertEquals(Errors.NONE, followerSyncResult.error) |
| assertEquals(followerAssignment, followerSyncResult.memberAssignment) |
| } |
| |
| @Test |
| def testCommitOffsetFromUnknownGroup(): Unit = { |
| val generationId = 1 |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| |
| val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.ILLEGAL_GENERATION), commitOffsetResult) |
| } |
| |
| @Test |
| def testCommitOffsetWithDefaultGeneration(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| |
| val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, |
| OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| } |
| |
| @Test |
| def testCommitOffsetsAfterGroupIsEmpty(): Unit = { |
| // Tests the scenario where the reset offset tool modifies the offsets |
| // of a group after it becomes empty |
| |
| // A group member joins |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| // and leaves. |
| val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId) |
| verifyLeaveGroupResult(leaveGroupResults) |
| |
| // The simple offset commit should now fail |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, |
| OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| |
| val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, error) |
| assertEquals(Some(0), partitionData.get(tip.topicPartition).map(_.offset)) |
| } |
| |
| @Test |
| def testFetchOffsets(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = 97L |
| val metadata = "some metadata" |
| val leaderEpoch = Optional.of[Integer](15) |
| val offsetAndMetadata = OffsetAndMetadata(offset, leaderEpoch, metadata, timer.time.milliseconds(), None) |
| |
| val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, |
| OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offsetAndMetadata)) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| |
| val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, error) |
| |
| val maybePartitionData = partitionData.get(tip.topicPartition) |
| assertTrue(maybePartitionData.isDefined) |
| assertEquals(offset, maybePartitionData.get.offset) |
| assertEquals(metadata, maybePartitionData.get.metadata) |
| assertEquals(leaderEpoch, maybePartitionData.get.leaderEpoch) |
| } |
| |
| @Test |
| def testCommitAndFetchOffsetsWithEmptyGroup(): Unit = { |
| // For backwards compatibility, the coordinator supports committing/fetching offsets with an empty groupId. |
| // To allow inspection and removal of the empty group, we must also support DescribeGroups and DeleteGroups |
| |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val groupId = "" |
| |
| val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, |
| OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| |
| val (fetchError, partitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, fetchError) |
| assertEquals(Some(0), partitionData.get(tip.topicPartition).map(_.offset)) |
| |
| val (describeError, summary) = groupCoordinator.handleDescribeGroup(groupId) |
| assertEquals(Errors.NONE, describeError) |
| assertEquals(Empty.toString, summary.state) |
| |
| val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) |
| val partition: Partition = mock(classOf[Partition]) |
| |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) |
| when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) |
| when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) |
| |
| val deleteErrors = groupCoordinator.handleDeleteGroups(Set(groupId)) |
| assertEquals(Errors.NONE, deleteErrors(groupId)) |
| |
| val (err, data) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, err) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), data.get(tip.topicPartition).map(_.offset)) |
| } |
| |
| @Test |
| def testBasicFetchTxnOffsets(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val producerId = 1000L |
| val producerEpoch : Short = 2 |
| |
| val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| |
| val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| |
| // Validate that the offset isn't materialjzed yet. |
| assertEquals(Errors.NONE, error) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tip.topicPartition).map(_.offset)) |
| |
| val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) |
| |
| // Send commit marker. |
| handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) |
| |
| // Validate that committed offset is materialized. |
| val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, secondReqError) |
| assertEquals(Some(0), secondReqPartitionData.get(tip.topicPartition).map(_.offset)) |
| } |
| |
| @Test |
| def testFetchTxnOffsetsWithAbort(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val producerId = 1000L |
| val producerEpoch : Short = 2 |
| |
| val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| |
| val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, error) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tip.topicPartition).map(_.offset)) |
| |
| val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) |
| |
| // Validate that the pending commit is discarded. |
| handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) |
| |
| val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, secondReqError) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), secondReqPartitionData.get(tip.topicPartition).map(_.offset)) |
| } |
| |
| @Test |
| def testFetchPendingTxnOffsetsWithAbort(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val producerId = 1000L |
| val producerEpoch : Short = 2 |
| |
| val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| |
| val nonExistTp = new TopicPartition("non-exist-topic", 0) |
| val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition, nonExistTp))) |
| assertEquals(Errors.NONE, error) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tip.topicPartition).map(_.offset)) |
| assertEquals(Some(Errors.UNSTABLE_OFFSET_COMMIT), partitionData.get(tip.topicPartition).map(_.error)) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(nonExistTp).map(_.offset)) |
| assertEquals(Some(Errors.NONE), partitionData.get(nonExistTp).map(_.error)) |
| |
| val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) |
| |
| // Validate that the pending commit is discarded. |
| handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) |
| |
| val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, secondReqError) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), secondReqPartitionData.get(tip.topicPartition).map(_.offset)) |
| assertEquals(Some(Errors.NONE), secondReqPartitionData.get(tip.topicPartition).map(_.error)) |
| } |
| |
| @Test |
| def testFetchPendingTxnOffsetsWithCommit(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "offset") |
| val offset = offsetAndMetadata(25) |
| val producerId = 1000L |
| val producerEpoch : Short = 2 |
| |
| val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| |
| val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, error) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tip.topicPartition).map(_.offset)) |
| assertEquals(Some(Errors.UNSTABLE_OFFSET_COMMIT), partitionData.get(tip.topicPartition).map(_.error)) |
| |
| val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) |
| |
| // Validate that the pending commit is committed |
| handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) |
| |
| val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, secondReqError) |
| assertEquals(Some(25), secondReqPartitionData.get(tip.topicPartition).map(_.offset)) |
| assertEquals(Some(Errors.NONE), secondReqPartitionData.get(tip.topicPartition).map(_.error)) |
| } |
| |
| @Test |
| def testFetchTxnOffsetsIgnoreSpuriousCommit(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val producerId = 1000L |
| val producerEpoch : Short = 2 |
| |
| val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| |
| val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, error) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tip.topicPartition).map(_.offset)) |
| |
| val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) |
| handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) |
| |
| val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, secondReqError) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), secondReqPartitionData.get(tip.topicPartition).map(_.offset)) |
| |
| // Ignore spurious commit. |
| handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) |
| |
| val (thirdReqError, thirdReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tip.topicPartition))) |
| assertEquals(Errors.NONE, thirdReqError) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), thirdReqPartitionData.get(tip.topicPartition).map(_.offset)) |
| } |
| |
| @Test |
| def testFetchTxnOffsetsOneProducerMultipleGroups(): Unit = { |
| // One producer, two groups located on separate offsets topic partitions. |
| // Both group have pending offset commits. |
| // Marker for only one partition is received. That commit should be materialized while the other should not. |
| |
| val topicIdPartitions = List( |
| new TopicIdPartition(Uuid.randomUuid(), 0, "topic1"), |
| new TopicIdPartition(Uuid.randomUuid(), 0, "topic2") |
| ) |
| val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15)) |
| val producerId = 1000L |
| val producerEpoch: Short = 3 |
| |
| val groupIds = List(groupId, otherGroupId) |
| val offsetTopicPartitions = List(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)), |
| new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(otherGroupId))) |
| |
| groupCoordinator.groupManager.addOwnedPartition(offsetTopicPartitions(1).partition) |
| val errors = mutable.ArrayBuffer[Errors]() |
| val partitionData = mutable.ArrayBuffer[scala.collection.Map[TopicPartition, OffsetFetchResponse.PartitionData]]() |
| |
| val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]() |
| |
| // Ensure that the two groups map to different partitions. |
| assertNotEquals(offsetTopicPartitions(0), offsetTopicPartitions(1)) |
| |
| commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(topicIdPartitions(0) -> offsets(0)))) |
| assertEquals(Errors.NONE, commitOffsetResults(0)(topicIdPartitions(0))) |
| commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, producerId, producerEpoch, Map(topicIdPartitions(1) -> offsets(1)))) |
| assertEquals(Errors.NONE, commitOffsetResults(1)(topicIdPartitions(1))) |
| |
| // We got a commit for only one __consumer_offsets partition. We should only materialize it's group offsets. |
| val topicPartitions = topicIdPartitions.map(_.topicPartition) |
| handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), TransactionResult.COMMIT) |
| groupCoordinator.handleFetchOffsets(groupIds(0), requireStable, Some(topicPartitions)) match { |
| case (error, partData) => |
| errors.append(error) |
| partitionData.append(partData) |
| case _ => |
| } |
| |
| groupCoordinator.handleFetchOffsets(groupIds(1), requireStable, Some(topicPartitions)) match { |
| case (error, partData) => |
| errors.append(error) |
| partitionData.append(partData) |
| case _ => |
| } |
| |
| assertEquals(2, errors.size) |
| assertEquals(Errors.NONE, errors(0)) |
| assertEquals(Errors.NONE, errors(1)) |
| |
| // Exactly one offset commit should have been materialized. |
| assertEquals(Some(offsets(0).offset), partitionData(0).get(topicPartitions(0)).map(_.offset)) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(topicPartitions(1)).map(_.offset)) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(topicPartitions(0)).map(_.offset)) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(topicPartitions(1)).map(_.offset)) |
| |
| // Now we receive the other marker. |
| handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), TransactionResult.COMMIT) |
| errors.clear() |
| partitionData.clear() |
| groupCoordinator.handleFetchOffsets(groupIds(0), requireStable, Some(topicPartitions)) match { |
| case (error, partData) => |
| errors.append(error) |
| partitionData.append(partData) |
| case _ => |
| } |
| |
| groupCoordinator.handleFetchOffsets(groupIds(1), requireStable, Some(topicPartitions)) match { |
| case (error, partData) => |
| errors.append(error) |
| partitionData.append(partData) |
| case _ => |
| } |
| // Two offsets should have been materialized |
| assertEquals(Some(offsets(0).offset), partitionData(0).get(topicPartitions(0)).map(_.offset)) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(topicPartitions(1)).map(_.offset)) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(topicPartitions(0)).map(_.offset)) |
| assertEquals(Some(offsets(1).offset), partitionData(1).get(topicPartitions(1)).map(_.offset)) |
| } |
| |
| @Test |
| def testFetchTxnOffsetsMultipleProducersOneGroup(): Unit = { |
| // One group, two producers |
| // Different producers will commit offsets for different partitions. |
| // Each partition's offsets should be materialized when the corresponding producer's marker is received. |
| |
| val topicIdPartitions = List( |
| new TopicIdPartition(Uuid.randomUuid(), 0, "topic1"), |
| new TopicIdPartition(Uuid.randomUuid(), 0, "topic2") |
| ) |
| val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15)) |
| val producerIds = List(1000L, 1005L) |
| val producerEpochs: Seq[Short] = List(3, 4) |
| |
| val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) |
| |
| val errors = mutable.ArrayBuffer[Errors]() |
| val partitionData = mutable.ArrayBuffer[scala.collection.Map[TopicPartition, OffsetFetchResponse.PartitionData]]() |
| |
| val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]() |
| |
| // producer0 commits the offsets for partition0 |
| commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(0), producerEpochs(0), Map(topicIdPartitions(0) -> offsets(0)))) |
| assertEquals(Errors.NONE, commitOffsetResults(0)(topicIdPartitions(0))) |
| |
| // producer1 commits the offsets for partition1 |
| commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(1), producerEpochs(1), Map(topicIdPartitions(1) -> offsets(1)))) |
| assertEquals(Errors.NONE, commitOffsetResults(1)(topicIdPartitions(1))) |
| |
| // producer0 commits its transaction. |
| val topicPartitions = topicIdPartitions.map(_.topicPartition) |
| handleTxnCompletion(producerIds(0), List(offsetTopicPartition), TransactionResult.COMMIT) |
| groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(topicPartitions)) match { |
| case (error, partData) => |
| errors.append(error) |
| partitionData.append(partData) |
| case _ => |
| } |
| |
| assertEquals(Errors.NONE, errors(0)) |
| |
| // We should only see the offset commit for producer0 |
| assertEquals(Some(offsets(0).offset), partitionData(0).get(topicPartitions(0)).map(_.offset)) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(topicPartitions(1)).map(_.offset)) |
| |
| // producer1 now commits its transaction. |
| handleTxnCompletion(producerIds(1), List(offsetTopicPartition), TransactionResult.COMMIT) |
| |
| groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(topicPartitions)) match { |
| case (error, partData) => |
| errors.append(error) |
| partitionData.append(partData) |
| case _ => |
| } |
| |
| assertEquals(Errors.NONE, errors(1)) |
| |
| // We should now see the offset commits for both producers. |
| assertEquals(Some(offsets(0).offset), partitionData(1).get(topicPartitions(0)).map(_.offset)) |
| assertEquals(Some(offsets(1).offset), partitionData(1).get(topicPartitions(1)).map(_.offset)) |
| } |
| |
| @Test |
| def testFetchOffsetForUnknownPartition(): Unit = { |
| val tp = new TopicPartition("topic", 0) |
| val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp))) |
| assertEquals(Errors.NONE, error) |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset)) |
| } |
| |
| @Test |
| def testFetchOffsetNotCoordinatorForGroup(): Unit = { |
| val tp = new TopicPartition("topic", 0) |
| val (error, partitionData) = groupCoordinator.handleFetchOffsets(otherGroupId, requireStable, Some(Seq(tp))) |
| assertEquals(Errors.NOT_COORDINATOR, error) |
| assertTrue(partitionData.isEmpty) |
| } |
| |
| @Test |
| def testFetchAllOffsets(): Unit = { |
| val tip1 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val tip2 = new TopicIdPartition(tip1.topicId, 1, "topic") |
| val tip3 = new TopicIdPartition(Uuid.randomUuid(), 0, "other-topic") |
| val offset1 = offsetAndMetadata(15) |
| val offset2 = offsetAndMetadata(16) |
| val offset3 = offsetAndMetadata(17) |
| |
| assertEquals((Errors.NONE, Map.empty), groupCoordinator.handleFetchOffsets(groupId, requireStable)) |
| |
| val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, |
| OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip1 -> offset1, tip2 -> offset2, tip3 -> offset3)) |
| assertEquals(Map(tip1 -> Errors.NONE, tip2 -> Errors.NONE, tip3 -> Errors.NONE), commitOffsetResult) |
| |
| val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable) |
| assertEquals(Errors.NONE, error) |
| assertEquals(3, partitionData.size) |
| assertTrue(partitionData.forall(_._2.error == Errors.NONE)) |
| assertEquals(Some(offset1.offset), partitionData.get(tip1.topicPartition).map(_.offset)) |
| assertEquals(Some(offset2.offset), partitionData.get(tip2.topicPartition).map(_.offset)) |
| assertEquals(Some(offset3.offset), partitionData.get(tip3.topicPartition).map(_.offset)) |
| } |
| |
| @Test |
| def testCommitOffsetInCompletingRebalance(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.REBALANCE_IN_PROGRESS), commitOffsetResult) |
| } |
| |
| @Test |
| def testCommitOffsetInCompletingRebalanceFromUnknownMemberId(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0 , "topic") |
| val offset = offsetAndMetadata(0) |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val generationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.UNKNOWN_MEMBER_ID), commitOffsetResult) |
| } |
| |
| @Test |
| def testCommitOffsetInCompletingRebalanceFromIllegalGeneration(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId + 1, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.ILLEGAL_GENERATION), commitOffsetResult) |
| } |
| |
| @Test |
| def testManualCommitOffsetShouldNotValidateMemberIdAndInstanceId(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| |
| var commitOffsetResult = commitOffsets( |
| groupId, |
| JoinGroupRequest.UNKNOWN_MEMBER_ID, |
| -1, |
| Map(tip -> offsetAndMetadata(0)), |
| Some("instance-id") |
| ) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| |
| commitOffsetResult = commitOffsets( |
| groupId, |
| "unknown", |
| -1, |
| Map(tip -> offsetAndMetadata(0)), |
| None |
| ) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| } |
| |
| @Test |
| def testTxnCommitOffsetWithFencedInstanceId(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val producerId = 1000L |
| val producerEpoch : Short = 2 |
| |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val leaderNoMemberIdCommitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, |
| Map(tip -> offset), memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, groupInstanceId = Some(leaderInstanceId)) |
| assertEquals(Map(tip -> Errors.FENCED_INSTANCE_ID), leaderNoMemberIdCommitOffsetResult) |
| |
| val leaderInvalidMemberIdCommitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, |
| Map(tip -> offset), memberId = "invalid-member", groupInstanceId = Some(leaderInstanceId)) |
| assertEquals(Map(tip -> Errors.FENCED_INSTANCE_ID), leaderInvalidMemberIdCommitOffsetResult) |
| |
| val leaderCommitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, |
| Map(tip -> offset), rebalanceResult.leaderId, Some(leaderInstanceId), rebalanceResult.generation) |
| assertEquals(Map(tip -> Errors.NONE), leaderCommitOffsetResult) |
| } |
| |
| @Test |
| def testTxnCommitOffsetWithInvalidMemberId(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val producerId = 1000L |
| val producerEpoch : Short = 2 |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val invalidIdCommitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, |
| Map(tip -> offset), "invalid-member") |
| assertEquals(Map(tip -> Errors.UNKNOWN_MEMBER_ID), invalidIdCommitOffsetResult) |
| } |
| |
| @Test |
| def testTxnCommitOffsetWithKnownMemberId(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val producerId = 1000L |
| val producerEpoch : Short = 2 |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| |
| val assignedConsumerId = joinGroupResult.memberId |
| val leaderCommitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, |
| Map(tip -> offset), assignedConsumerId, generationId = joinGroupResult.generationId) |
| assertEquals(Map(tip -> Errors.NONE), leaderCommitOffsetResult) |
| } |
| |
| @Test |
| def testTxnCommitOffsetWithIllegalGeneration(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val producerId = 1000L |
| val producerEpoch : Short = 2 |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| |
| val assignedConsumerId = joinGroupResult.memberId |
| val initialGenerationId = joinGroupResult.generationId |
| val illegalGenerationCommitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, |
| Map(tip -> offset), memberId = assignedConsumerId, generationId = initialGenerationId + 5) |
| assertEquals(Map(tip -> Errors.ILLEGAL_GENERATION), illegalGenerationCommitOffsetResult) |
| } |
| |
| @Test |
| def testTxnCommitOffsetWithLegalGeneration(): Unit = { |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val producerId = 1000L |
| val producerEpoch : Short = 2 |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| |
| val assignedConsumerId = joinGroupResult.memberId |
| val initialGenerationId = joinGroupResult.generationId |
| val leaderCommitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, |
| Map(tip -> offset), memberId = assignedConsumerId, generationId = initialGenerationId) |
| assertEquals(Map(tip -> Errors.NONE), leaderCommitOffsetResult) |
| } |
| |
| @Test |
| def testHeartbeatDuringRebalanceCausesRebalanceInProgress(): Unit = { |
| // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts) |
| val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val assignedConsumerId = joinGroupResult.memberId |
| val initialGenerationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| // Then join with a new consumer to trigger a rebalance |
| sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| |
| // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress |
| val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId) |
| assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) |
| } |
| |
| @Test |
| def testGenerationIdIncrementsOnRebalance(): Unit = { |
| val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val initialGenerationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| val memberId = joinGroupResult.memberId |
| assertEquals(1, initialGenerationId) |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val syncGroupResult = syncGroupLeader(groupId, initialGenerationId, memberId, Map(memberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, protocols) |
| val otherJoinGroupResult = await(joinGroupFuture, 1) |
| |
| val nextGenerationId = otherJoinGroupResult.generationId |
| val otherJoinGroupError = otherJoinGroupResult.error |
| assertEquals(2, nextGenerationId) |
| assertEquals(Errors.NONE, otherJoinGroupError) |
| } |
| |
| @Test |
| def testLeaveGroupWrongCoordinator(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val leaveGroupResults = singleLeaveGroup(otherGroupId, memberId) |
| verifyLeaveGroupResult(leaveGroupResults, Errors.NOT_COORDINATOR) |
| } |
| |
| @Test |
| def testLeaveGroupUnknownGroup(): Unit = { |
| val leaveGroupResults = singleLeaveGroup(groupId, memberId) |
| verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.UNKNOWN_MEMBER_ID)) |
| } |
| |
| @Test |
| def testLeaveGroupUnknownConsumerExistingGroup(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val otherMemberId = "consumerId" |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val leaveGroupResults = singleLeaveGroup(groupId, otherMemberId) |
| verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.UNKNOWN_MEMBER_ID)) |
| } |
| |
| @Test |
| def testSingleLeaveDeadGroup(): Unit = { |
| val deadGroupId = "deadGroupId" |
| |
| groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime())) |
| val leaveGroupResults = singleLeaveGroup(deadGroupId, memberId) |
| verifyLeaveGroupResult(leaveGroupResults, Errors.COORDINATOR_NOT_AVAILABLE) |
| } |
| |
| @Test |
| def testBatchLeaveDeadGroup(): Unit = { |
| val deadGroupId = "deadGroupId" |
| |
| groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime())) |
| val leaveGroupResults = batchLeaveGroup(deadGroupId, |
| List(new MemberIdentity().setMemberId(memberId), new MemberIdentity().setMemberId(memberId))) |
| verifyLeaveGroupResult(leaveGroupResults, Errors.COORDINATOR_NOT_AVAILABLE) |
| } |
| |
| @Test |
| def testValidLeaveGroup(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId) |
| verifyLeaveGroupResult(leaveGroupResults) |
| } |
| |
| @Test |
| def testLeaveGroupWithFencedInstanceId(): Unit = { |
| val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val leaveGroupResults = singleLeaveGroup(groupId, "some_member", Some(leaderInstanceId)) |
| verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.FENCED_INSTANCE_ID)) |
| } |
| |
| @Test |
| def testLeaveGroupStaticMemberWithUnknownMemberId(): Unit = { |
| val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| // Having unknown member id will not affect the request processing. |
| val leaveGroupResults = singleLeaveGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Some(leaderInstanceId)) |
| verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.NONE)) |
| } |
| |
| @Test |
| def testStaticMembersValidBatchLeaveGroup(): Unit = { |
| staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity() |
| .setGroupInstanceId(leaderInstanceId), new MemberIdentity().setGroupInstanceId(followerInstanceId))) |
| |
| verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.NONE, Errors.NONE)) |
| } |
| |
| @Test |
| def testStaticMembersWrongCoordinatorBatchLeaveGroup(): Unit = { |
| staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val leaveGroupResults = batchLeaveGroup("invalid-group", List(new MemberIdentity() |
| .setGroupInstanceId(leaderInstanceId), new MemberIdentity().setGroupInstanceId(followerInstanceId))) |
| |
| verifyLeaveGroupResult(leaveGroupResults, Errors.NOT_COORDINATOR) |
| } |
| |
| @Test |
| def testStaticMembersUnknownGroupBatchLeaveGroup(): Unit = { |
| val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity() |
| .setGroupInstanceId(leaderInstanceId), new MemberIdentity().setGroupInstanceId(followerInstanceId))) |
| |
| verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID)) |
| } |
| |
| @Test |
| def testStaticMembersFencedInstanceBatchLeaveGroup(): Unit = { |
| staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity() |
| .setGroupInstanceId(leaderInstanceId), new MemberIdentity() |
| .setGroupInstanceId(followerInstanceId) |
| .setMemberId("invalid-member"))) |
| |
| verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.NONE, Errors.FENCED_INSTANCE_ID)) |
| } |
| |
| @Test |
| def testStaticMembersUnknownInstanceBatchLeaveGroup(): Unit = { |
| staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| |
| val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity() |
| .setGroupInstanceId("unknown-instance"), new MemberIdentity() |
| .setGroupInstanceId(followerInstanceId))) |
| |
| verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.UNKNOWN_MEMBER_ID, Errors.NONE)) |
| } |
| |
| @Test |
| def testPendingMemberBatchLeaveGroup(): Unit = { |
| val pendingMember = setupGroupWithPendingMember() |
| |
| val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity() |
| .setGroupInstanceId("unknown-instance"), new MemberIdentity() |
| .setMemberId(pendingMember.memberId))) |
| |
| verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.UNKNOWN_MEMBER_ID, Errors.NONE)) |
| } |
| |
| @Test |
| def testListGroupsIncludesStableGroups(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val (error, groups) = groupCoordinator.handleListGroups(Set()) |
| assertEquals(Errors.NONE, error) |
| assertEquals(1, groups.size) |
| assertEquals(GroupOverview("groupId", "consumer", Stable.toString), groups.head) |
| } |
| |
| @Test |
| def testListGroupsIncludesRebalancingGroups(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val (error, groups) = groupCoordinator.handleListGroups(Set()) |
| assertEquals(Errors.NONE, error) |
| assertEquals(1, groups.size) |
| assertEquals(GroupOverview("groupId", "consumer", CompletingRebalance.toString), groups.head) |
| } |
| |
| @Test |
| def testListGroupsWithStates(): Unit = { |
| val allStates = Set(PreparingRebalance, CompletingRebalance, Stable, Dead, Empty).map(s => s.toString) |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| // Member joins the group |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| // The group should be in CompletingRebalance |
| val (error, groups) = groupCoordinator.handleListGroups(Set(CompletingRebalance.toString)) |
| assertEquals(Errors.NONE, error) |
| assertEquals(1, groups.size) |
| val (error2, groups2) = groupCoordinator.handleListGroups(allStates.filterNot(s => s == CompletingRebalance.toString)) |
| assertEquals(Errors.NONE, error2) |
| assertEquals(0, groups2.size) |
| |
| // Member syncs |
| val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| // The group is now stable |
| val (error3, groups3) = groupCoordinator.handleListGroups(Set(Stable.toString)) |
| assertEquals(Errors.NONE, error3) |
| assertEquals(1, groups3.size) |
| val (error4, groups4) = groupCoordinator.handleListGroups(allStates.filterNot(s => s == Stable.toString)) |
| assertEquals(Errors.NONE, error4) |
| assertEquals(0, groups4.size) |
| |
| // Member leaves |
| val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId) |
| verifyLeaveGroupResult(leaveGroupResults) |
| |
| // The group is now empty |
| val (error5, groups5) = groupCoordinator.handleListGroups(Set(Empty.toString)) |
| assertEquals(Errors.NONE, error5) |
| assertEquals(1, groups5.size) |
| val (error6, groups6) = groupCoordinator.handleListGroups(allStates.filterNot(s => s == Empty.toString)) |
| assertEquals(Errors.NONE, error6) |
| assertEquals(0, groups6.size) |
| } |
| |
| @Test |
| def testDescribeGroupWrongCoordinator(): Unit = { |
| val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId) |
| assertEquals(Errors.NOT_COORDINATOR, error) |
| } |
| |
| @Test |
| def testDescribeGroupInactiveGroup(): Unit = { |
| val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) |
| assertEquals(Errors.NONE, error) |
| assertEquals(GroupCoordinator.DeadGroup, summary) |
| } |
| |
| @Test |
| def testDescribeGroupStableForDynamicMember(): Unit = { |
| val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) |
| assertEquals(Errors.NONE, error) |
| assertEquals(protocolType, summary.protocolType) |
| assertEquals("range", summary.protocol) |
| assertEquals(List(assignedMemberId), summary.members.map(_.memberId)) |
| } |
| |
| @Test |
| def testDescribeGroupStableForStaticMember(): Unit = { |
| val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val generationId = joinGroupResult.generationId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) |
| assertEquals(Errors.NONE, error) |
| assertEquals(protocolType, summary.protocolType) |
| assertEquals("range", summary.protocol) |
| assertEquals(List(assignedMemberId), summary.members.map(_.memberId)) |
| assertEquals(List(leaderInstanceId), summary.members.flatMap(_.groupInstanceId)) |
| } |
| |
| @Test |
| def testDescribeGroupRebalancing(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) |
| assertEquals(Errors.NONE, error) |
| assertEquals(protocolType, summary.protocolType) |
| assertEquals(GroupCoordinator.NoProtocol, summary.protocol) |
| assertEquals(CompletingRebalance.toString, summary.state) |
| assertTrue(summary.members.map(_.memberId).contains(joinGroupResult.memberId)) |
| assertTrue(summary.members.forall(_.metadata.isEmpty)) |
| assertTrue(summary.members.forall(_.assignment.isEmpty)) |
| } |
| |
| @Test |
| def testDeleteNonEmptyGroup(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| |
| val result = groupCoordinator.handleDeleteGroups(Set(groupId)) |
| assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NON_EMPTY_GROUP)) |
| } |
| |
| @Test |
| def testDeleteGroupWithInvalidGroupId(): Unit = { |
| val invalidGroupId = null |
| val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId)) |
| assert(result.size == 1 && result.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID)) |
| } |
| |
| @Test |
| def testDeleteGroupWithWrongCoordinator(): Unit = { |
| val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId)) |
| assert(result.size == 1 && result.contains(otherGroupId) && result.get(otherGroupId).contains(Errors.NOT_COORDINATOR)) |
| } |
| |
| @Test |
| def testDeleteEmptyGroup(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| |
| val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId) |
| verifyLeaveGroupResult(leaveGroupResults) |
| |
| val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) |
| val partition: Partition = mock(classOf[Partition]) |
| |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) |
| when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) |
| when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) |
| |
| val result = groupCoordinator.handleDeleteGroups(Set(groupId)) |
| assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE)) |
| } |
| |
| @Test |
| def testDeleteEmptyGroupWithStoredOffsets(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| val assignedMemberId = joinGroupResult.memberId |
| val joinGroupError = joinGroupResult.error |
| assertEquals(Errors.NONE, joinGroupError) |
| |
| val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") |
| val offset = offsetAndMetadata(0) |
| val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) |
| |
| val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId) |
| assertEquals(Stable.toString, describeGroupResult._2.state) |
| assertEquals(assignedMemberId, describeGroupResult._2.members.head.memberId) |
| |
| val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId) |
| verifyLeaveGroupResult(leaveGroupResults) |
| |
| val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) |
| val partition: Partition = mock(classOf[Partition]) |
| |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) |
| when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) |
| when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) |
| |
| val result = groupCoordinator.handleDeleteGroups(Set(groupId)) |
| assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE)) |
| |
| assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId)._2.state) |
| } |
| |
| @Test |
| def testDeleteOffsetOfNonExistingGroup(): Unit = { |
| val tp = new TopicPartition("foo", 0) |
| val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp), |
| RequestLocal.NoCaching) |
| |
| assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError) |
| assertTrue(topics.isEmpty) |
| } |
| |
| @Test |
| def testDeleteOffsetOfNonEmptyNonConsumerGroup(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| dynamicJoinGroup(groupId, memberId, "My Protocol", protocols) |
| val tp = new TopicPartition("foo", 0) |
| val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp), |
| RequestLocal.NoCaching) |
| |
| assertEquals(Errors.NON_EMPTY_GROUP, groupError) |
| assertTrue(topics.isEmpty) |
| } |
| |
| @Test |
| def testDeleteOffsetOfEmptyNonConsumerGroup(): Unit = { |
| // join the group |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, "My Protocol", protocols) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val ti1p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") |
| val ti2p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "bar") |
| val offset = offsetAndMetadata(37) |
| |
| val validOffsetCommitResult = commitOffsets(groupId, joinGroupResult.memberId, joinGroupResult.generationId, |
| Map(ti1p0 -> offset, ti2p0 -> offset)) |
| assertEquals(Map(ti1p0 -> Errors.NONE, ti2p0 -> Errors.NONE), validOffsetCommitResult) |
| |
| // and leaves. |
| val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId) |
| verifyLeaveGroupResult(leaveGroupResults) |
| |
| assertTrue(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Empty))) |
| |
| val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) |
| val partition: Partition = mock(classOf[Partition]) |
| |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) |
| when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) |
| when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) |
| |
| val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(ti1p0.topicPartition), |
| RequestLocal.NoCaching) |
| |
| assertEquals(Errors.NONE, groupError) |
| assertEquals(1, topics.size) |
| assertEquals(Some(Errors.NONE), topics.get(ti1p0.topicPartition)) |
| |
| val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, requireStable, Some(Seq(ti1p0.topicPartition, ti2p0.topicPartition))) |
| |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(ti1p0.topicPartition).map(_.offset)) |
| assertEquals(Some(offset.offset), cachedOffsets.get(ti2p0.topicPartition).map(_.offset)) |
| } |
| |
| @Test |
| def testDeleteOffsetOfConsumerGroupWithUnparsableProtocol(): Unit = { |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| |
| val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") |
| val offset = offsetAndMetadata(37) |
| |
| val validOffsetCommitResult = commitOffsets(groupId, joinGroupResult.memberId, joinGroupResult.generationId, |
| Map(tip -> offset)) |
| assertEquals(Map(tip -> Errors.NONE), validOffsetCommitResult) |
| |
| val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tip.topicPartition), |
| RequestLocal.NoCaching) |
| |
| assertEquals(Errors.NONE, groupError) |
| assertEquals(1, topics.size) |
| assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(tip.topicPartition)) |
| } |
| |
| @Test |
| def testDeleteOffsetOfDeadConsumerGroup(): Unit = { |
| val group = new GroupMetadata(groupId, Dead, new MockTime()) |
| group.protocolType = Some(protocolType) |
| groupCoordinator.groupManager.addGroup(group) |
| |
| val tp = new TopicPartition("foo", 0) |
| val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp), |
| RequestLocal.NoCaching) |
| |
| assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError) |
| assertTrue(topics.isEmpty) |
| } |
| |
| @Test |
| def testDeleteOffsetOfEmptyConsumerGroup(): Unit = { |
| // join the group |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val ti1p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") |
| val ti2p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "bar") |
| val offset = offsetAndMetadata(37) |
| |
| val validOffsetCommitResult = commitOffsets(groupId, joinGroupResult.memberId, joinGroupResult.generationId, |
| Map(ti1p0 -> offset, ti2p0 -> offset)) |
| assertEquals(Map(ti1p0 -> Errors.NONE, ti2p0 -> Errors.NONE), validOffsetCommitResult) |
| |
| // and leaves. |
| val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId) |
| verifyLeaveGroupResult(leaveGroupResults) |
| |
| assertTrue(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Empty))) |
| |
| val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) |
| val partition: Partition = mock(classOf[Partition]) |
| |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) |
| when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) |
| when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) |
| |
| val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(ti1p0.topicPartition), |
| RequestLocal.NoCaching) |
| |
| assertEquals(Errors.NONE, groupError) |
| assertEquals(1, topics.size) |
| assertEquals(Some(Errors.NONE), topics.get(ti1p0.topicPartition)) |
| |
| val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, requireStable, Some(Seq(ti1p0.topicPartition, ti2p0.topicPartition))) |
| |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(ti1p0.topicPartition).map(_.offset)) |
| assertEquals(Some(offset.offset), cachedOffsets.get(ti2p0.topicPartition).map(_.offset)) |
| } |
| |
| @Test |
| def testDeleteOffsetOfStableConsumerGroup(): Unit = { |
| // join the group |
| val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID |
| val subscription = new Subscription(List("bar").asJava) |
| |
| val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, |
| List(("protocol", ConsumerProtocol.serializeSubscription(subscription).array()))) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| |
| val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty) |
| assertEquals(Errors.NONE, syncGroupResult.error) |
| |
| val ti1p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") |
| val ti2p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "bar") |
| val offset = offsetAndMetadata(37) |
| |
| val validOffsetCommitResult = commitOffsets(groupId, joinGroupResult.memberId, joinGroupResult.generationId, |
| Map(ti1p0 -> offset, ti2p0 -> offset)) |
| assertEquals(Map(ti1p0 -> Errors.NONE, ti2p0 -> Errors.NONE), validOffsetCommitResult) |
| |
| assertTrue(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Stable))) |
| |
| val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) |
| val partition: Partition = mock(classOf[Partition]) |
| |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) |
| when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) |
| when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) |
| |
| val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(ti1p0.topicPartition, ti2p0.topicPartition), |
| RequestLocal.NoCaching) |
| |
| assertEquals(Errors.NONE, groupError) |
| assertEquals(2, topics.size) |
| assertEquals(Some(Errors.NONE), topics.get(ti1p0.topicPartition)) |
| assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(ti2p0.topicPartition)) |
| |
| val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, requireStable, Some(Seq(ti1p0.topicPartition, ti2p0.topicPartition))) |
| |
| assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(ti1p0.topicPartition).map(_.offset)) |
| assertEquals(Some(offset.offset), cachedOffsets.get(ti2p0.topicPartition).map(_.offset)) |
| } |
| |
| @Test |
| def shouldDelayInitialRebalanceByGroupInitialRebalanceDelayOnEmptyGroup(): Unit = { |
| val firstJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) |
| timer.advanceClock(GroupInitialRebalanceDelay / 2) |
| verifyDelayedTaskNotCompleted(firstJoinFuture) |
| timer.advanceClock((GroupInitialRebalanceDelay / 2) + 1) |
| val joinGroupResult = await(firstJoinFuture, 1) |
| assertEquals(Errors.NONE, joinGroupResult.error) |
| } |
| |
| private def verifyDelayedTaskNotCompleted(firstJoinFuture: Future[JoinGroupResult]) = { |
| assertThrows(classOf[TimeoutException], () => await(firstJoinFuture, 1), |
| () => "should have timed out as rebalance delay not expired") |
| } |
| |
| @Test |
| def shouldResetRebalanceDelayWhenNewMemberJoinsGroupInInitialRebalance(): Unit = { |
| val rebalanceTimeout = GroupInitialRebalanceDelay * 3 |
| val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout) |
| timer.advanceClock(GroupInitialRebalanceDelay - 1) |
| val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout) |
| timer.advanceClock(2) |
| |
| // advance past initial rebalance delay and make sure that tasks |
| // haven't been completed |
| timer.advanceClock(GroupInitialRebalanceDelay / 2 + 1) |
| verifyDelayedTaskNotCompleted(firstMemberJoinFuture) |
| verifyDelayedTaskNotCompleted(secondMemberJoinFuture) |
| // advance clock beyond updated delay and make sure the |
| // tasks have completed |
| timer.advanceClock(GroupInitialRebalanceDelay / 2) |
| val firstResult = await(firstMemberJoinFuture, 1) |
| val secondResult = await(secondMemberJoinFuture, 1) |
| assertEquals(Errors.NONE, firstResult.error) |
| assertEquals(Errors.NONE, secondResult.error) |
| } |
| |
| @Test |
| def shouldDelayRebalanceUptoRebalanceTimeout(): Unit = { |
| val rebalanceTimeout = GroupInitialRebalanceDelay * 2 |
| val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout) |
| val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout) |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| val thirdMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout) |
| timer.advanceClock(GroupInitialRebalanceDelay) |
| |
| verifyDelayedTaskNotCompleted(firstMemberJoinFuture) |
| verifyDelayedTaskNotCompleted(secondMemberJoinFuture) |
| verifyDelayedTaskNotCompleted(thirdMemberJoinFuture) |
| |
| // advance clock beyond rebalanceTimeout |
| timer.advanceClock(1) |
| |
| val firstResult = await(firstMemberJoinFuture, 1) |
| val secondResult = await(secondMemberJoinFuture, 1) |
| val thirdResult = await(thirdMemberJoinFuture, 1) |
| assertEquals(Errors.NONE, firstResult.error) |
| assertEquals(Errors.NONE, secondResult.error) |
| assertEquals(Errors.NONE, thirdResult.error) |
| } |
| |
| @Test |
| def testCompleteHeartbeatWithGroupDead(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| heartbeat(groupId, rebalanceResult.leaderId, rebalanceResult.generation) |
| val group = getGroup(groupId) |
| group.transitionTo(Dead) |
| val leaderMemberId = rebalanceResult.leaderId |
| assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, () => true)) |
| groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false) |
| assertTrue(group.has(leaderMemberId)) |
| } |
| |
| @Test |
| def testCompleteHeartbeatWithMemberAlreadyRemoved(): Unit = { |
| val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) |
| heartbeat(groupId, rebalanceResult.leaderId, rebalanceResult.generation) |
| val group = getGroup(groupId) |
| val leaderMemberId = rebalanceResult.leaderId |
| group.remove(leaderMemberId) |
| assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, () => true)) |
| } |
| |
| private def getGroup(groupId: String): GroupMetadata = { |
| val groupOpt = groupCoordinator.groupManager.getGroup(groupId) |
| assertTrue(groupOpt.isDefined) |
| groupOpt.get |
| } |
| |
| private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = { |
| val responsePromise = Promise[JoinGroupResult]() |
| val responseFuture = responsePromise.future |
| val responseCallback: JoinGroupCallback = responsePromise.success |
| (responseFuture, responseCallback) |
| } |
| |
| private def setupSyncGroupCallback: (Future[SyncGroupResult], SyncGroupCallback) = { |
| val responsePromise = Promise[SyncGroupResult]() |
| val responseFuture = responsePromise.future |
| val responseCallback: SyncGroupCallback = responsePromise.success |
| (responseFuture, responseCallback) |
| } |
| |
| private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = { |
| val responsePromise = Promise[HeartbeatCallbackParams]() |
| val responseFuture = responsePromise.future |
| val responseCallback: HeartbeatCallback = error => responsePromise.success(error) |
| (responseFuture, responseCallback) |
| } |
| |
| private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback) = { |
| val responsePromise = Promise[CommitOffsetCallbackParams]() |
| val responseFuture = responsePromise.future |
| val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets) |
| (responseFuture, responseCallback) |
| } |
| |
| private def setupLeaveGroupCallback: (Future[LeaveGroupResult], LeaveGroupCallback) = { |
| val responsePromise = Promise[LeaveGroupResult]() |
| val responseFuture = responsePromise.future |
| val responseCallback: LeaveGroupCallback = result => responsePromise.success(result) |
| (responseFuture, responseCallback) |
| } |
| |
| private def sendJoinGroup(groupId: String, |
| memberId: String, |
| protocolType: String, |
| protocols: List[(String, Array[Byte])], |
| groupInstanceId: Option[String] = None, |
| sessionTimeout: Int = DefaultSessionTimeout, |
| rebalanceTimeout: Int = DefaultRebalanceTimeout, |
| requireKnownMemberId: Boolean = false, |
| supportSkippingAssignment: Boolean = true): Future[JoinGroupResult] = { |
| val (responseFuture, responseCallback) = setupJoinGroupCallback |
| |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) |
| |
| groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId, requireKnownMemberId, supportSkippingAssignment, |
| "clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback) |
| responseFuture |
| } |
| |
| private def sendStaticJoinGroupWithPersistence(groupId: String, |
| memberId: String, |
| protocolType: String, |
| protocols: List[(String, Array[Byte])], |
| groupInstanceId: String, |
| sessionTimeout: Int, |
| rebalanceTimeout: Int, |
| appendRecordError: Errors, |
| requireKnownMemberId: Boolean = false, |
| supportSkippingAssignment: Boolean): Future[JoinGroupResult] = { |
| val (responseFuture, responseCallback) = setupJoinGroupCallback |
| |
| val capturedArgument: ArgumentCaptor[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[scala.collection.Map[TopicPartition, PartitionResponse] => Unit]) |
| |
| when(replicaManager.appendRecords(anyLong, |
| anyShort(), |
| internalTopicsAllowed = ArgumentMatchers.eq(true), |
| origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), |
| any[Map[TopicPartition, MemoryRecords]], |
| capturedArgument.capture(), |
| any[Option[ReentrantLock]], |
| any(), |
| any(), |
| any(), |
| any(), |
| any() |
| )).thenAnswer(_ => { |
| capturedArgument.getValue.apply( |
| Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> |
| new PartitionResponse(appendRecordError, 0L, RecordBatch.NO_TIMESTAMP, 0L) |
| ) |
| ) |
| }) |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) |
| |
| groupCoordinator.handleJoinGroup(groupId, memberId, Some(groupInstanceId), requireKnownMemberId, supportSkippingAssignment, |
| "clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback) |
| responseFuture |
| } |
| |
| private def sendSyncGroupLeader(groupId: String, |
| generation: Int, |
| leaderId: String, |
| protocolType: Option[String], |
| protocolName: Option[String], |
| groupInstanceId: Option[String], |
| assignment: Map[String, Array[Byte]]): Future[SyncGroupResult] = { |
| val (responseFuture, responseCallback) = setupSyncGroupCallback |
| |
| val capturedArgument: ArgumentCaptor[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[scala.collection.Map[TopicPartition, PartitionResponse] => Unit]) |
| |
| when(replicaManager.appendRecords(anyLong, |
| anyShort(), |
| internalTopicsAllowed = ArgumentMatchers.eq(true), |
| origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), |
| any[Map[TopicPartition, MemoryRecords]], |
| capturedArgument.capture(), |
| any[Option[ReentrantLock]], |
| any(), |
| any(), |
| any(), |
| any(), |
| any())).thenAnswer(_ => { |
| capturedArgument.getValue.apply( |
| Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> |
| new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L) |
| ) |
| ) |
| } |
| ) |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) |
| |
| groupCoordinator.handleSyncGroup(groupId, generation, leaderId, protocolType, protocolName, |
| groupInstanceId, assignment, responseCallback) |
| responseFuture |
| } |
| |
| private def sendSyncGroupFollower(groupId: String, |
| generation: Int, |
| memberId: String, |
| prototolType: Option[String] = None, |
| prototolName: Option[String] = None, |
| groupInstanceId: Option[String] = None): Future[SyncGroupResult] = { |
| val (responseFuture, responseCallback) = setupSyncGroupCallback |
| |
| |
| groupCoordinator.handleSyncGroup(groupId, generation, memberId, |
| prototolType, prototolName, groupInstanceId, Map.empty[String, Array[Byte]], responseCallback) |
| responseFuture |
| } |
| |
| private def dynamicJoinGroup(groupId: String, |
| memberId: String, |
| protocolType: String, |
| protocols: List[(String, Array[Byte])], |
| sessionTimeout: Int = DefaultSessionTimeout, |
| rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = { |
| val requireKnownMemberId = true |
| var responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, None, sessionTimeout, rebalanceTimeout, requireKnownMemberId) |
| |
| // Since member id is required, we need another bounce to get the successful join group result. |
| if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID && requireKnownMemberId) { |
| val joinGroupResult = Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS)) |
| // If some other error is triggered, return the error immediately for caller to handle. |
| if (joinGroupResult.error != Errors.MEMBER_ID_REQUIRED) { |
| return joinGroupResult |
| } |
| responseFuture = sendJoinGroup(groupId, joinGroupResult.memberId, protocolType, protocols, None, sessionTimeout, rebalanceTimeout, requireKnownMemberId) |
| } |
| timer.advanceClock(GroupInitialRebalanceDelay + 1) |
| // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay |
| Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS)) |
| } |
| |
| private def staticJoinGroup(groupId: String, |
| memberId: String, |
| groupInstanceId: String, |
| protocolType: String, |
| protocols: List[(String, Array[Byte])], |
| clockAdvance: Int = GroupInitialRebalanceDelay + 1, |
| sessionTimeout: Int = DefaultSessionTimeout, |
| rebalanceTimeout: Int = DefaultRebalanceTimeout, |
| supportSkippingAssignment: Boolean = true): JoinGroupResult = { |
| val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, Some(groupInstanceId), sessionTimeout, rebalanceTimeout, |
| supportSkippingAssignment = supportSkippingAssignment) |
| |
| timer.advanceClock(clockAdvance) |
| // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay |
| Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS)) |
| } |
| |
| private def staticJoinGroupWithPersistence(groupId: String, |
| memberId: String, |
| groupInstanceId: String, |
| protocolType: String, |
| protocols: List[(String, Array[Byte])], |
| clockAdvance: Int, |
| sessionTimeout: Int = DefaultSessionTimeout, |
| rebalanceTimeout: Int = DefaultRebalanceTimeout, |
| appendRecordError: Errors = Errors.NONE, |
| supportSkippingAssignment: Boolean = true): JoinGroupResult = { |
| val responseFuture = sendStaticJoinGroupWithPersistence(groupId, memberId, protocolType, protocols, |
| groupInstanceId, sessionTimeout, rebalanceTimeout, appendRecordError, supportSkippingAssignment = supportSkippingAssignment) |
| |
| timer.advanceClock(clockAdvance) |
| // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay |
| Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS)) |
| } |
| |
| private def syncGroupFollower(groupId: String, |
| generationId: Int, |
| memberId: String, |
| protocolType: Option[String] = None, |
| protocolName: Option[String] = None, |
| groupInstanceId: Option[String] = None, |
| sessionTimeout: Int = DefaultSessionTimeout): SyncGroupResult = { |
| val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId, protocolType, |
| protocolName, groupInstanceId) |
| Await.result(responseFuture, Duration(sessionTimeout + 100, TimeUnit.MILLISECONDS)) |
| } |
| |
| private def syncGroupLeader(groupId: String, |
| generationId: Int, |
| memberId: String, |
| assignment: Map[String, Array[Byte]], |
| protocolType: Option[String] = None, |
| protocolName: Option[String] = None, |
| groupInstanceId: Option[String] = None, |
| sessionTimeout: Int = DefaultSessionTimeout): SyncGroupResult = { |
| val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, protocolType, |
| protocolName, groupInstanceId, assignment) |
| Await.result(responseFuture, Duration(sessionTimeout + 100, TimeUnit.MILLISECONDS)) |
| } |
| |
| private def heartbeat(groupId: String, |
| consumerId: String, |
| generationId: Int, |
| groupInstanceId: Option[String] = None): HeartbeatCallbackParams = { |
| val (responseFuture, responseCallback) = setupHeartbeatCallback |
| |
| |
| groupCoordinator.handleHeartbeat(groupId, consumerId, groupInstanceId, generationId, responseCallback) |
| Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) |
| } |
| |
| private def await[T](future: Future[T], millis: Long): T = { |
| Await.result(future, Duration(millis, TimeUnit.MILLISECONDS)) |
| } |
| |
| private def commitOffsets(groupId: String, |
| memberId: String, |
| generationId: Int, |
| offsets: Map[TopicIdPartition, OffsetAndMetadata], |
| groupInstanceId: Option[String] = None): CommitOffsetCallbackParams = { |
| val (responseFuture, responseCallback) = setupCommitOffsetsCallback |
| |
| val capturedArgument: ArgumentCaptor[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[scala.collection.Map[TopicPartition, PartitionResponse] => Unit]) |
| |
| when(replicaManager.appendRecords(anyLong, |
| anyShort(), |
| internalTopicsAllowed = ArgumentMatchers.eq(true), |
| origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), |
| any[Map[TopicPartition, MemoryRecords]], |
| capturedArgument.capture(), |
| any[Option[ReentrantLock]], |
| any(), |
| any(), |
| any(), |
| any(), |
| any()) |
| ).thenAnswer(_ => { |
| capturedArgument.getValue.apply( |
| Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> |
| new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L) |
| ) |
| ) |
| }) |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) |
| |
| groupCoordinator.handleCommitOffsets(groupId, memberId, groupInstanceId, generationId, offsets, responseCallback) |
| Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) |
| } |
| |
| private def commitTransactionalOffsets(groupId: String, |
| producerId: Long, |
| producerEpoch: Short, |
| offsets: Map[TopicIdPartition, OffsetAndMetadata], |
| memberId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID, |
| groupInstanceId: Option[String] = Option.empty, |
| generationId: Int = JoinGroupRequest.UNKNOWN_GENERATION_ID) : CommitOffsetCallbackParams = { |
| val (responseFuture, responseCallback) = setupCommitOffsetsCallback |
| |
| val capturedArgument: ArgumentCaptor[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[scala.collection.Map[TopicPartition, PartitionResponse] => Unit]) |
| |
| when(replicaManager.appendRecords(anyLong, |
| anyShort(), |
| internalTopicsAllowed = ArgumentMatchers.eq(true), |
| origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), |
| any[Map[TopicPartition, MemoryRecords]], |
| capturedArgument.capture(), |
| any[Option[ReentrantLock]], |
| any(), |
| any(), |
| any(), |
| any(), |
| any()) |
| ).thenAnswer(_ => { |
| capturedArgument.getValue.apply( |
| Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) -> |
| new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L) |
| ) |
| ) |
| }) |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) |
| |
| groupCoordinator.handleTxnCommitOffsets(groupId, producerId, producerEpoch, |
| memberId, groupInstanceId, generationId, offsets, responseCallback) |
| val result = Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) |
| result |
| } |
| |
| private def singleLeaveGroup(groupId: String, |
| consumerId: String, |
| groupInstanceId: Option[String] = None): LeaveGroupResult = { |
| val singleMemberIdentity = List( |
| new MemberIdentity() |
| .setMemberId(consumerId) |
| .setGroupInstanceId(groupInstanceId.orNull)) |
| batchLeaveGroup(groupId, singleMemberIdentity) |
| } |
| |
| private def batchLeaveGroup(groupId: String, |
| memberIdentities: List[MemberIdentity]): LeaveGroupResult = { |
| val (responseFuture, responseCallback) = setupLeaveGroupCallback |
| |
| when(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))) |
| .thenReturn(HostedPartition.None) |
| when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) |
| |
| groupCoordinator.handleLeaveGroup(groupId, memberIdentities, responseCallback) |
| Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) |
| } |
| |
| def handleTxnCompletion(producerId: Long, |
| offsetsPartitions: Iterable[TopicPartition], |
| transactionResult: TransactionResult): Unit = { |
| val isCommit = transactionResult == TransactionResult.COMMIT |
| groupCoordinator.groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit) |
| } |
| |
| private def offsetAndMetadata(offset: Long): OffsetAndMetadata = { |
| OffsetAndMetadata(offset, "", timer.time.milliseconds()) |
| } |
| } |
| |
| object GroupCoordinatorTest { |
| def verifyLeaveGroupResult(leaveGroupResult: LeaveGroupResult, |
| expectedTopLevelError: Errors = Errors.NONE, |
| expectedMemberLevelErrors: List[Errors] = List.empty): Unit = { |
| assertEquals(expectedTopLevelError, leaveGroupResult.topLevelError) |
| if (expectedMemberLevelErrors.nonEmpty) { |
| assertEquals(expectedMemberLevelErrors.size, leaveGroupResult.memberResponses.size) |
| for (i <- expectedMemberLevelErrors.indices) { |
| assertEquals(expectedMemberLevelErrors(i), leaveGroupResult.memberResponses(i).error) |
| } |
| } |
| } |
| } |