| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.coordinator.group; |
| |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.Uuid; |
| import org.apache.kafka.common.errors.CoordinatorNotAvailableException; |
| import org.apache.kafka.common.errors.GroupIdNotFoundException; |
| import org.apache.kafka.common.errors.IllegalGenerationException; |
| import org.apache.kafka.common.errors.RebalanceInProgressException; |
| import org.apache.kafka.common.errors.StaleMemberEpochException; |
| import org.apache.kafka.common.errors.UnknownMemberIdException; |
| import org.apache.kafka.common.errors.UnsupportedVersionException; |
| import org.apache.kafka.common.message.JoinGroupRequestData; |
| import org.apache.kafka.common.message.OffsetCommitRequestData; |
| import org.apache.kafka.common.message.OffsetCommitResponseData; |
| import org.apache.kafka.common.message.OffsetDeleteRequestData; |
| import org.apache.kafka.common.message.OffsetDeleteResponseData; |
| import org.apache.kafka.common.message.OffsetFetchRequestData; |
| import org.apache.kafka.common.message.OffsetFetchResponseData; |
| import org.apache.kafka.common.message.TxnOffsetCommitRequestData; |
| import org.apache.kafka.common.message.TxnOffsetCommitResponseData; |
| import org.apache.kafka.common.network.ClientInformation; |
| import org.apache.kafka.common.network.ListenerName; |
| import org.apache.kafka.common.protocol.ApiKeys; |
| import org.apache.kafka.common.protocol.ApiMessage; |
| import org.apache.kafka.common.protocol.Errors; |
| import org.apache.kafka.common.record.RecordBatch; |
| import org.apache.kafka.common.requests.RequestContext; |
| import org.apache.kafka.common.requests.RequestHeader; |
| import org.apache.kafka.common.requests.TransactionResult; |
| import org.apache.kafka.common.security.auth.KafkaPrincipal; |
| import org.apache.kafka.common.security.auth.SecurityProtocol; |
| import org.apache.kafka.common.utils.LogContext; |
| import org.apache.kafka.common.utils.MockTime; |
| import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; |
| import org.apache.kafka.coordinator.group.assignor.RangeAssignor; |
| import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; |
| import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; |
| import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; |
| import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; |
| import org.apache.kafka.coordinator.group.classic.ClassicGroup; |
| import org.apache.kafka.coordinator.group.classic.ClassicGroupMember; |
| import org.apache.kafka.coordinator.group.classic.ClassicGroupState; |
| import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; |
| import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; |
| import org.apache.kafka.image.MetadataImage; |
| import org.apache.kafka.server.common.ApiMessageAndVersion; |
| import org.apache.kafka.server.common.MetadataVersion; |
| import org.apache.kafka.timeline.SnapshotRegistry; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.EnumSource; |
| |
| import java.net.InetAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.OptionalInt; |
| import java.util.OptionalLong; |
| |
| import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET; |
| import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME; |
| import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_DELETIONS_SENSOR_NAME; |
| import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| public class OffsetMetadataManagerTest { |
| static class OffsetMetadataManagerTestContext { |
| public static class Builder { |
| private final MockTime time = new MockTime(); |
| private final MockCoordinatorTimer<Void, Record> timer = new MockCoordinatorTimer<>(time); |
| private final LogContext logContext = new LogContext(); |
| private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); |
| private GroupMetadataManager groupMetadataManager = null; |
| private MetadataImage metadataImage = null; |
| private GroupCoordinatorConfig config = null; |
| private GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); |
| |
| Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { |
| config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60 * 1000); |
| return this; |
| } |
| |
| Builder withOffsetsRetentionMs(long offsetsRetentionMs) { |
| config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, offsetsRetentionMs); |
| return this; |
| } |
| |
| Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) { |
| this.groupMetadataManager = groupMetadataManager; |
| return this; |
| } |
| |
| OffsetMetadataManagerTestContext build() { |
| if (metadataImage == null) metadataImage = MetadataImage.EMPTY; |
| if (config == null) { |
| config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24 * 60 * 1000); |
| } |
| |
| if (groupMetadataManager == null) { |
| groupMetadataManager = new GroupMetadataManager.Builder() |
| .withTime(time) |
| .withTimer(timer) |
| .withSnapshotRegistry(snapshotRegistry) |
| .withLogContext(logContext) |
| .withMetadataImage(metadataImage) |
| .withConsumerGroupAssignors(Collections.singletonList(new RangeAssignor())) |
| .withGroupCoordinatorMetricsShard(metrics) |
| .build(); |
| } |
| |
| OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder() |
| .withTime(time) |
| .withLogContext(logContext) |
| .withSnapshotRegistry(snapshotRegistry) |
| .withMetadataImage(metadataImage) |
| .withGroupMetadataManager(groupMetadataManager) |
| .withGroupCoordinatorConfig(config) |
| .withGroupCoordinatorMetricsShard(metrics) |
| .build(); |
| |
| return new OffsetMetadataManagerTestContext( |
| time, |
| timer, |
| snapshotRegistry, |
| metrics, |
| groupMetadataManager, |
| offsetMetadataManager |
| ); |
| } |
| } |
| |
| final MockTime time; |
| final MockCoordinatorTimer<Void, Record> timer; |
| final SnapshotRegistry snapshotRegistry; |
| final GroupCoordinatorMetricsShard metrics; |
| final GroupMetadataManager groupMetadataManager; |
| final OffsetMetadataManager offsetMetadataManager; |
| |
| long lastCommittedOffset = 0L; |
| long lastWrittenOffset = 0L; |
| |
| OffsetMetadataManagerTestContext( |
| MockTime time, |
| MockCoordinatorTimer<Void, Record> timer, |
| SnapshotRegistry snapshotRegistry, |
| GroupCoordinatorMetricsShard metrics, |
| GroupMetadataManager groupMetadataManager, |
| OffsetMetadataManager offsetMetadataManager |
| ) { |
| this.time = time; |
| this.timer = timer; |
| this.snapshotRegistry = snapshotRegistry; |
| this.metrics = metrics; |
| this.groupMetadataManager = groupMetadataManager; |
| this.offsetMetadataManager = offsetMetadataManager; |
| } |
| |
| public Group getOrMaybeCreateGroup( |
| Group.GroupType groupType, |
| String groupId |
| ) { |
| switch (groupType) { |
| case CLASSIC: |
| return groupMetadataManager.getOrMaybeCreateClassicGroup( |
| groupId, |
| true |
| ); |
| case CONSUMER: |
| return groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| groupId, |
| true |
| ); |
| default: |
| throw new IllegalArgumentException("Invalid group type: " + groupType); |
| } |
| } |
| |
| public void commit() { |
| long lastCommittedOffset = this.lastCommittedOffset; |
| this.lastCommittedOffset = lastWrittenOffset; |
| snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset); |
| } |
| |
| public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( |
| OffsetCommitRequestData request |
| ) { |
| return commitOffset(ApiKeys.OFFSET_COMMIT.latestVersion(), request); |
| } |
| |
| public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( |
| short version, |
| OffsetCommitRequestData request |
| ) { |
| RequestContext context = new RequestContext( |
| new RequestHeader( |
| ApiKeys.OFFSET_COMMIT, |
| version, |
| "client", |
| 0 |
| ), |
| "1", |
| InetAddress.getLoopbackAddress(), |
| KafkaPrincipal.ANONYMOUS, |
| ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), |
| SecurityProtocol.PLAINTEXT, |
| ClientInformation.EMPTY, |
| false |
| ); |
| |
| CoordinatorResult<OffsetCommitResponseData, Record> result = offsetMetadataManager.commitOffset( |
| context, |
| request |
| ); |
| |
| result.records().forEach(this::replay); |
| return result; |
| } |
| |
| public CoordinatorResult<TxnOffsetCommitResponseData, Record> commitTransactionalOffset( |
| TxnOffsetCommitRequestData request |
| ) { |
| RequestContext context = new RequestContext( |
| new RequestHeader( |
| ApiKeys.TXN_OFFSET_COMMIT, |
| ApiKeys.TXN_OFFSET_COMMIT.latestVersion(), |
| "client", |
| 0 |
| ), |
| "1", |
| InetAddress.getLoopbackAddress(), |
| KafkaPrincipal.ANONYMOUS, |
| ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), |
| SecurityProtocol.PLAINTEXT, |
| ClientInformation.EMPTY, |
| false |
| ); |
| |
| CoordinatorResult<TxnOffsetCommitResponseData, Record> result = offsetMetadataManager.commitTransactionalOffset( |
| context, |
| request |
| ); |
| |
| result.records().forEach(record -> replay( |
| request.producerId(), |
| record |
| )); |
| |
| return result; |
| } |
| |
| public List<Record> deletePartitions( |
| List<TopicPartition> topicPartitions |
| ) { |
| List<Record> records = offsetMetadataManager.onPartitionsDeleted(topicPartitions); |
| records.forEach(this::replay); |
| return records; |
| } |
| |
| public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( |
| OffsetDeleteRequestData request |
| ) { |
| CoordinatorResult<OffsetDeleteResponseData, Record> result = offsetMetadataManager.deleteOffsets(request); |
| result.records().forEach(this::replay); |
| return result; |
| } |
| |
| public int deleteAllOffsets( |
| String groupId, |
| List<Record> records |
| ) { |
| List<Record> addedRecords = new ArrayList<>(); |
| int numDeletedOffsets = offsetMetadataManager.deleteAllOffsets(groupId, addedRecords); |
| addedRecords.forEach(this::replay); |
| |
| records.addAll(addedRecords); |
| return numDeletedOffsets; |
| } |
| |
| public boolean cleanupExpiredOffsets(String groupId, List<Record> records) { |
| List<Record> addedRecords = new ArrayList<>(); |
| boolean isOffsetsEmptyForGroup = offsetMetadataManager.cleanupExpiredOffsets(groupId, addedRecords); |
| addedRecords.forEach(this::replay); |
| |
| records.addAll(addedRecords); |
| return isOffsetsEmptyForGroup; |
| } |
| |
| public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets( |
| String groupId, |
| List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics, |
| long committedOffset |
| ) { |
| return fetchOffsets( |
| groupId, |
| null, |
| -1, |
| topics, |
| committedOffset |
| ); |
| } |
| |
| public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets( |
| String groupId, |
| String memberId, |
| int memberEpoch, |
| List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics, |
| long committedOffset |
| ) { |
| OffsetFetchResponseData.OffsetFetchResponseGroup response = offsetMetadataManager.fetchOffsets( |
| new OffsetFetchRequestData.OffsetFetchRequestGroup() |
| .setGroupId(groupId) |
| .setMemberId(memberId) |
| .setMemberEpoch(memberEpoch) |
| .setTopics(topics), |
| committedOffset |
| ); |
| assertEquals(groupId, response.groupId()); |
| return response.topics(); |
| } |
| |
| public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets( |
| String groupId, |
| long committedOffset |
| ) { |
| return fetchAllOffsets( |
| groupId, |
| null, |
| -1, |
| committedOffset |
| ); |
| } |
| |
| public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets( |
| String groupId, |
| String memberId, |
| int memberEpoch, |
| long committedOffset |
| ) { |
| OffsetFetchResponseData.OffsetFetchResponseGroup response = offsetMetadataManager.fetchAllOffsets( |
| new OffsetFetchRequestData.OffsetFetchRequestGroup() |
| .setGroupId(groupId) |
| .setMemberId(memberId) |
| .setMemberEpoch(memberEpoch), |
| committedOffset |
| ); |
| assertEquals(groupId, response.groupId()); |
| return response.topics(); |
| } |
| |
| public List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> sleep(long ms) { |
| time.sleep(ms); |
| List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> timeouts = timer.poll(); |
| timeouts.forEach(timeout -> { |
| if (timeout.result.replayRecords()) { |
| timeout.result.records().forEach(this::replay); |
| } |
| }); |
| return timeouts; |
| } |
| |
| public void commitOffset( |
| String groupId, |
| String topic, |
| int partition, |
| long offset, |
| int leaderEpoch |
| ) { |
| commitOffset( |
| groupId, |
| topic, |
| partition, |
| offset, |
| leaderEpoch, |
| time.milliseconds() |
| ); |
| } |
| |
| public void commitOffset( |
| String groupId, |
| String topic, |
| int partition, |
| long offset, |
| int leaderEpoch, |
| long commitTimestamp |
| ) { |
| commitOffset( |
| RecordBatch.NO_PRODUCER_ID, |
| groupId, |
| topic, |
| partition, |
| offset, |
| leaderEpoch, |
| commitTimestamp |
| ); |
| } |
| |
| public void commitOffset( |
| long producerId, |
| String groupId, |
| String topic, |
| int partition, |
| long offset, |
| int leaderEpoch, |
| long commitTimestamp |
| ) { |
| replay(producerId, RecordHelpers.newOffsetCommitRecord( |
| groupId, |
| topic, |
| partition, |
| new OffsetAndMetadata( |
| offset, |
| OptionalInt.of(leaderEpoch), |
| "metadata", |
| commitTimestamp, |
| OptionalLong.empty() |
| ), |
| MetadataVersion.latestTesting() |
| )); |
| } |
| |
| public void deleteOffset( |
| String groupId, |
| String topic, |
| int partition |
| ) { |
| replay(RecordHelpers.newOffsetCommitTombstoneRecord( |
| groupId, |
| topic, |
| partition |
| )); |
| } |
| |
| private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) { |
| if (apiMessageAndVersion == null) { |
| return null; |
| } else { |
| return apiMessageAndVersion.message(); |
| } |
| } |
| |
| private void replay( |
| Record record |
| ) { |
| replay( |
| RecordBatch.NO_PRODUCER_ID, |
| record |
| ); |
| } |
| |
| private void replay( |
| long producerId, |
| Record record |
| ) { |
| snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); |
| |
| ApiMessageAndVersion key = record.key(); |
| ApiMessageAndVersion value = record.value(); |
| |
| if (key == null) { |
| throw new IllegalStateException("Received a null key in " + record); |
| } |
| |
| switch (key.version()) { |
| case OffsetCommitKey.HIGHEST_SUPPORTED_VERSION: |
| offsetMetadataManager.replay( |
| lastWrittenOffset, |
| producerId, |
| (OffsetCommitKey) key.message(), |
| (OffsetCommitValue) messageOrNull(value) |
| ); |
| break; |
| |
| default: |
| throw new IllegalStateException("Received an unknown record type " + key.version() |
| + " in " + record); |
| } |
| |
| lastWrittenOffset++; |
| } |
| |
| private void replayEndTransactionMarker( |
| long producerId, |
| TransactionResult result |
| ) { |
| snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); |
| offsetMetadataManager.replayEndTransactionMarker(producerId, result); |
| lastWrittenOffset++; |
| } |
| |
| public void testOffsetDeleteWith( |
| String groupId, |
| String topic, |
| int partition, |
| Errors expectedError |
| ) { |
| final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = |
| new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( |
| new OffsetDeleteRequestData.OffsetDeleteRequestTopic() |
| .setName(topic) |
| .setPartitions(Collections.singletonList( |
| new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition) |
| )) |
| ).iterator()); |
| |
| final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection expectedResponsePartitionCollection = |
| new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); |
| expectedResponsePartitionCollection.add( |
| new OffsetDeleteResponseData.OffsetDeleteResponsePartition() |
| .setPartitionIndex(partition) |
| .setErrorCode(expectedError.code()) |
| ); |
| |
| final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection expectedResponseTopicCollection = |
| new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList( |
| new OffsetDeleteResponseData.OffsetDeleteResponseTopic() |
| .setName(topic) |
| .setPartitions(expectedResponsePartitionCollection) |
| ).iterator()); |
| |
| List<Record> expectedRecords = Collections.emptyList(); |
| if (hasOffset(groupId, topic, partition) && expectedError == Errors.NONE) { |
| expectedRecords = Collections.singletonList( |
| RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition) |
| ); |
| } |
| |
| final CoordinatorResult<OffsetDeleteResponseData, Record> coordinatorResult = deleteOffsets( |
| new OffsetDeleteRequestData() |
| .setGroupId(groupId) |
| .setTopics(requestTopicCollection) |
| ); |
| |
| assertEquals(new OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection), coordinatorResult.response()); |
| assertEquals(expectedRecords, coordinatorResult.records()); |
| } |
| |
| public boolean hasOffset( |
| String groupId, |
| String topic, |
| int partition |
| ) { |
| return offsetMetadataManager.hasCommittedOffset(groupId, topic, partition) || |
| offsetMetadataManager.hasPendingTransactionalOffsets(groupId, topic, partition); |
| } |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) |
| public void testOffsetCommitWithUnknownGroup(short version) { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| Class<? extends Throwable> expectedType; |
| if (version >= 9) { |
| expectedType = GroupIdNotFoundException.class; |
| } else { |
| expectedType = IllegalGenerationException.class; |
| } |
| |
| // Verify that the request is rejected with the correct exception. |
| assertThrows(expectedType, () -> context.commitOffset( |
| version, |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(10) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ) |
| ); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetCommitWithDeadGroup() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create a dead group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| group.transitionTo(ClassicGroupState.DEAD); |
| |
| // Verify that the request is rejected with the correct exception. |
| assertThrows(CoordinatorNotAvailableException.class, () -> context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(10) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ) |
| ); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetCommitWithUnknownMemberId() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| // Verify that the request is rejected with the correct exception. |
| assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(10) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ) |
| ); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetCommitWithIllegalGeneration() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| group.add(mkGenericMember("member", Optional.of("new-instance-id"))); |
| |
| // Transition to next generation. |
| group.transitionTo(ClassicGroupState.PREPARING_REBALANCE); |
| group.initNextGeneration(); |
| assertEquals(1, group.generationId()); |
| |
| // Verify that the request is rejected with the correct exception. |
| assertThrows(IllegalGenerationException.class, () -> context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(10) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ) |
| ); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetCommitWithUnknownInstanceId() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member without static id. |
| group.add(mkGenericMember("member", Optional.empty())); |
| |
| // Verify that the request is rejected with the correct exception. |
| assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGroupInstanceId("instanceid") |
| .setGenerationIdOrMemberEpoch(10) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ) |
| ); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetCommitWithFencedInstanceId() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member with static id. |
| group.add(mkGenericMember("member", Optional.of("new-instance-id"))); |
| |
| // Verify that the request is rejected with the correct exception. |
| assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGroupInstanceId("old-instance-id") |
| .setGenerationIdOrMemberEpoch(10) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ) |
| ); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetCommitWhileInCompletingRebalanceState() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| group.add(mkGenericMember("member", Optional.of("new-instance-id"))); |
| |
| // Transition to next generation. |
| group.transitionTo(ClassicGroupState.PREPARING_REBALANCE); |
| group.initNextGeneration(); |
| assertEquals(1, group.generationId()); |
| |
| // Verify that the request is rejected with the correct exception. |
| assertThrows(RebalanceInProgressException.class, () -> context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(1) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ) |
| ); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetCommitWithoutMemberIdAndGeneration() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| group.add(mkGenericMember("member", Optional.of("new-instance-id"))); |
| |
| // Transition to next generation. |
| group.transitionTo(ClassicGroupState.PREPARING_REBALANCE); |
| group.initNextGeneration(); |
| assertEquals(1, group.generationId()); |
| |
| // Verify that the request is rejected with the correct exception. |
| assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ) |
| ); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetCommitWithRetentionTime() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| group.add(mkGenericMember("member", Optional.of("new-instance-id"))); |
| |
| // Transition to next generation. |
| group.transitionTo(ClassicGroupState.PREPARING_REBALANCE); |
| group.initNextGeneration(); |
| assertEquals(1, group.generationId()); |
| group.transitionTo(ClassicGroupState.STABLE); |
| |
| CoordinatorResult<OffsetCommitResponseData, Record> result = context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(1) |
| .setRetentionTimeMs(1234L) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ); |
| |
| assertEquals( |
| new OffsetCommitResponseData() |
| .setTopics(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code()) |
| )) |
| )), |
| result.response() |
| ); |
| |
| assertEquals( |
| Collections.singletonList(RecordHelpers.newOffsetCommitRecord( |
| "foo", |
| "bar", |
| 0, |
| new OffsetAndMetadata( |
| 100L, |
| OptionalInt.empty(), |
| "", |
| context.time.milliseconds(), |
| OptionalLong.of(context.time.milliseconds() + 1234L) |
| ), |
| MetadataImage.EMPTY.features().metadataVersion() |
| )), |
| result.records() |
| ); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetCommitMaintainsSession() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create a group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| ClassicGroupMember member = mkGenericMember("member", Optional.empty()); |
| group.add(member); |
| |
| // Transition to next generation. |
| group.transitionTo(ClassicGroupState.PREPARING_REBALANCE); |
| group.initNextGeneration(); |
| assertEquals(1, group.generationId()); |
| group.transitionTo(ClassicGroupState.STABLE); |
| |
| // Schedule session timeout. This would be normally done when |
| // the group transitions to stable. |
| context.groupMetadataManager.rescheduleClassicGroupMemberHeartbeat(group, member); |
| |
| // Advance time by half of the session timeout. No timeouts are |
| // expired. |
| assertEquals(Collections.emptyList(), context.sleep(5000 / 2)); |
| |
| // Commit. |
| context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(1) |
| .setRetentionTimeMs(1234L) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ); |
| |
| // Advance time by half of the session timeout. No timeouts are |
| // expired. |
| assertEquals(Collections.emptyList(), context.sleep(5000 / 2)); |
| |
| // Advance time by half of the session timeout again. The timeout should |
| // expire and the member is removed from the group. |
| List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> timeouts = |
| context.sleep(5000 / 2); |
| assertEquals(1, timeouts.size()); |
| assertFalse(group.hasMemberId(member.memberId())); |
| } |
| |
| @Test |
| public void testSimpleGroupOffsetCommit() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| CoordinatorResult<OffsetCommitResponseData, Record> result = context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ); |
| |
| assertEquals( |
| new OffsetCommitResponseData() |
| .setTopics(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code()) |
| )) |
| )), |
| result.response() |
| ); |
| |
| assertEquals( |
| Collections.singletonList(RecordHelpers.newOffsetCommitRecord( |
| "foo", |
| "bar", |
| 0, |
| new OffsetAndMetadata( |
| 100L, |
| OptionalInt.empty(), |
| "", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| ), |
| MetadataImage.EMPTY.features().metadataVersion() |
| )), |
| result.records() |
| ); |
| |
| // A generic should have been created. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| false |
| ); |
| assertNotNull(group); |
| assertEquals("foo", group.groupId()); |
| } |
| |
| @Test |
| public void testSimpleGroupOffsetCommitWithInstanceId() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| CoordinatorResult<OffsetCommitResponseData, Record> result = context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| // Instance id should be ignored. |
| .setGroupInstanceId("instance-id") |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ); |
| |
| assertEquals( |
| new OffsetCommitResponseData() |
| .setTopics(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code()) |
| )) |
| )), |
| result.response() |
| ); |
| |
| assertEquals( |
| Collections.singletonList(RecordHelpers.newOffsetCommitRecord( |
| "foo", |
| "bar", |
| 0, |
| new OffsetAndMetadata( |
| 100L, |
| OptionalInt.empty(), |
| "", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| ), |
| MetadataImage.EMPTY.features().metadataVersion() |
| )), |
| result.records() |
| ); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetCommitWithUnknownMemberId() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| |
| // Verify that the request is rejected with the correct exception. |
| assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(10) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ) |
| ); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetCommitWithStaleMemberEpoch() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| group.updateMember(new ConsumerGroupMember.Builder("member") |
| .setMemberEpoch(10) |
| .setPreviousMemberEpoch(10) |
| .build() |
| ); |
| |
| OffsetCommitRequestData request = new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(9) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )); |
| |
| // Verify that a smaller epoch is rejected. |
| assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(request)); |
| |
| // Verify that a larger epoch is rejected. |
| request.setGenerationIdOrMemberEpoch(11); |
| assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(request)); |
| } |
| |
| @ParameterizedTest |
| @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) |
| public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short version) { |
| // All the newer versions are fine. |
| if (version >= 9) return; |
| // Version 0 does not support MemberId and GenerationIdOrMemberEpoch fields. |
| if (version == 0) return; |
| |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| group.updateMember(new ConsumerGroupMember.Builder("member") |
| .setMemberEpoch(10) |
| .setPreviousMemberEpoch(10) |
| .build() |
| ); |
| |
| // Verify that the request is rejected with the correct exception. |
| assertThrows(UnsupportedVersionException.class, () -> context.commitOffset( |
| version, |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(9) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ) |
| ); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetCommitFromAdminClient() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| |
| CoordinatorResult<OffsetCommitResponseData, Record> result = context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| )) |
| )) |
| ); |
| |
| assertEquals( |
| new OffsetCommitResponseData() |
| .setTopics(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code()) |
| )) |
| )), |
| result.response() |
| ); |
| |
| assertEquals( |
| Collections.singletonList(RecordHelpers.newOffsetCommitRecord( |
| "foo", |
| "bar", |
| 0, |
| new OffsetAndMetadata( |
| 100L, |
| OptionalInt.empty(), |
| "", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| ), |
| MetadataImage.EMPTY.features().metadataVersion() |
| )), |
| result.records() |
| ); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetCommit() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| group.updateMember(new ConsumerGroupMember.Builder("member") |
| .setMemberEpoch(10) |
| .setPreviousMemberEpoch(10) |
| .build() |
| ); |
| |
| CoordinatorResult<OffsetCommitResponseData, Record> result = context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(10) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| .setCommittedLeaderEpoch(10) |
| .setCommittedMetadata("metadata") |
| .setCommitTimestamp(context.time.milliseconds()) |
| )) |
| )) |
| ); |
| |
| assertEquals( |
| new OffsetCommitResponseData() |
| .setTopics(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code()) |
| )) |
| )), |
| result.response() |
| ); |
| |
| assertEquals( |
| Collections.singletonList(RecordHelpers.newOffsetCommitRecord( |
| "foo", |
| "bar", |
| 0, |
| new OffsetAndMetadata( |
| 100L, |
| OptionalInt.of(10), |
| "metadata", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| ), |
| MetadataImage.EMPTY.features().metadataVersion() |
| )), |
| result.records() |
| ); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetCommitWithOffsetMetadataTooLarge() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() |
| .withOffsetMetadataMaxSize(5) |
| .build(); |
| |
| // Create an empty group. |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| group.updateMember(new ConsumerGroupMember.Builder("member") |
| .setMemberEpoch(10) |
| .setPreviousMemberEpoch(10) |
| .build() |
| ); |
| |
| CoordinatorResult<OffsetCommitResponseData, Record> result = context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(10) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| .setCommittedLeaderEpoch(10) |
| .setCommittedMetadata("toolarge") |
| .setCommitTimestamp(context.time.milliseconds()), |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(100L) |
| .setCommittedLeaderEpoch(10) |
| .setCommittedMetadata("small") |
| .setCommitTimestamp(context.time.milliseconds()) |
| )) |
| )) |
| ); |
| |
| assertEquals( |
| new OffsetCommitResponseData() |
| .setTopics(Collections.singletonList( |
| new OffsetCommitResponseData.OffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()), |
| new OffsetCommitResponseData.OffsetCommitResponsePartition() |
| .setPartitionIndex(1) |
| .setErrorCode(Errors.NONE.code()) |
| )) |
| )), |
| result.response() |
| ); |
| |
| assertEquals( |
| Collections.singletonList(RecordHelpers.newOffsetCommitRecord( |
| "foo", |
| "bar", |
| 1, |
| new OffsetAndMetadata( |
| 100L, |
| OptionalInt.of(10), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| ), |
| MetadataImage.EMPTY.features().metadataVersion() |
| )), |
| result.records() |
| ); |
| } |
| |
| @Test |
| public void testConsumerGroupTransactionalOffsetCommit() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| group.updateMember(new ConsumerGroupMember.Builder("member") |
| .setMemberEpoch(10) |
| .setPreviousMemberEpoch(10) |
| .build() |
| ); |
| |
| CoordinatorResult<TxnOffsetCommitResponseData, Record> result = context.commitTransactionalOffset( |
| new TxnOffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationId(10) |
| .setTopics(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| .setCommittedLeaderEpoch(10) |
| .setCommittedMetadata("metadata") |
| )) |
| )) |
| ); |
| |
| assertEquals( |
| new TxnOffsetCommitResponseData() |
| .setTopics(Collections.singletonList( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code()) |
| )) |
| )), |
| result.response() |
| ); |
| |
| assertEquals( |
| Collections.singletonList(RecordHelpers.newOffsetCommitRecord( |
| "foo", |
| "bar", |
| 0, |
| new OffsetAndMetadata( |
| 100L, |
| OptionalInt.of(10), |
| "metadata", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| ), |
| MetadataImage.EMPTY.features().metadataVersion() |
| )), |
| result.records() |
| ); |
| } |
| |
| @Test |
| public void testConsumerGroupTransactionalOffsetCommitWithUnknownGroupId() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| assertThrows(IllegalGenerationException.class, () -> context.commitTransactionalOffset( |
| new TxnOffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationId(10) |
| .setTopics(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| .setCommittedLeaderEpoch(10) |
| .setCommittedMetadata("metadata") |
| )) |
| )) |
| )); |
| } |
| |
| @Test |
| public void testConsumerGroupTransactionalOffsetCommitWithUnknownMemberId() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| |
| assertThrows(UnknownMemberIdException.class, () -> context.commitTransactionalOffset( |
| new TxnOffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationId(10) |
| .setTopics(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| .setCommittedLeaderEpoch(10) |
| .setCommittedMetadata("metadata") |
| )) |
| )) |
| )); |
| } |
| |
| @Test |
| public void testConsumerGroupTransactionalOffsetCommitWithStaleMemberEpoch() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| group.updateMember(new ConsumerGroupMember.Builder("member") |
| .setMemberEpoch(10) |
| .setPreviousMemberEpoch(10) |
| .build() |
| ); |
| |
| assertThrows(IllegalGenerationException.class, () -> context.commitTransactionalOffset( |
| new TxnOffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationId(100) |
| .setTopics(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| .setCommittedLeaderEpoch(10) |
| .setCommittedMetadata("metadata") |
| )) |
| )) |
| )); |
| } |
| |
| @Test |
| public void testGenericGroupTransactionalOffsetCommit() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create a group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| ClassicGroupMember member = mkGenericMember("member", Optional.empty()); |
| group.add(member); |
| |
| // Transition to next generation. |
| group.transitionTo(ClassicGroupState.PREPARING_REBALANCE); |
| group.initNextGeneration(); |
| assertEquals(1, group.generationId()); |
| group.transitionTo(ClassicGroupState.STABLE); |
| |
| CoordinatorResult<TxnOffsetCommitResponseData, Record> result = context.commitTransactionalOffset( |
| new TxnOffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationId(1) |
| .setTopics(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| .setCommittedLeaderEpoch(10) |
| .setCommittedMetadata("metadata") |
| )) |
| )) |
| ); |
| |
| assertEquals( |
| new TxnOffsetCommitResponseData() |
| .setTopics(Collections.singletonList( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() |
| .setPartitionIndex(0) |
| .setErrorCode(Errors.NONE.code()) |
| )) |
| )), |
| result.response() |
| ); |
| |
| assertEquals( |
| Collections.singletonList(RecordHelpers.newOffsetCommitRecord( |
| "foo", |
| "bar", |
| 0, |
| new OffsetAndMetadata( |
| 100L, |
| OptionalInt.of(10), |
| "metadata", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| ), |
| MetadataImage.EMPTY.features().metadataVersion() |
| )), |
| result.records() |
| ); |
| } |
| |
| @Test |
| public void testGenericGroupTransactionalOffsetCommitWithUnknownGroupId() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| assertThrows(IllegalGenerationException.class, () -> context.commitTransactionalOffset( |
| new TxnOffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationId(10) |
| .setTopics(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| .setCommittedLeaderEpoch(10) |
| .setCommittedMetadata("metadata") |
| )) |
| )) |
| )); |
| } |
| |
| @Test |
| public void testGenericGroupTransactionalOffsetCommitWithUnknownMemberId() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| assertThrows(UnknownMemberIdException.class, () -> context.commitTransactionalOffset( |
| new TxnOffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationId(10) |
| .setTopics(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| .setCommittedLeaderEpoch(10) |
| .setCommittedMetadata("metadata") |
| )) |
| )) |
| )); |
| } |
| |
| @Test |
| public void testGenericGroupTransactionalOffsetCommitWithIllegalGenerationId() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create a group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| ClassicGroupMember member = mkGenericMember("member", Optional.empty()); |
| group.add(member); |
| |
| // Transition to next generation. |
| group.transitionTo(ClassicGroupState.PREPARING_REBALANCE); |
| group.initNextGeneration(); |
| assertEquals(1, group.generationId()); |
| group.transitionTo(ClassicGroupState.STABLE); |
| |
| assertThrows(IllegalGenerationException.class, () -> context.commitTransactionalOffset( |
| new TxnOffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationId(100) |
| .setTopics(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L) |
| .setCommittedLeaderEpoch(10) |
| .setCommittedMetadata("metadata") |
| )) |
| )) |
| )); |
| } |
| |
| @Test |
| public void testGenericGroupFetchOffsetsWithDeadGroup() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create a dead group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "group", |
| true |
| ); |
| group.transitionTo(ClassicGroupState.DEAD); |
| |
| List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList( |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("foo") |
| .setPartitionIndexes(Arrays.asList(0, 1)), |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("bar") |
| .setPartitionIndexes(Collections.singletonList(0)) |
| ); |
| |
| List<OffsetFetchResponseData.OffsetFetchResponseTopics> expectedResponse = Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkInvalidOffsetPartitionResponse(0), |
| mkInvalidOffsetPartitionResponse(1) |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| mkInvalidOffsetPartitionResponse(0) |
| )) |
| ); |
| |
| assertEquals(expectedResponse, context.fetchOffsets("group", request, Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testFetchOffsetsWithUnknownGroup() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList( |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("foo") |
| .setPartitionIndexes(Arrays.asList(0, 1)), |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("bar") |
| .setPartitionIndexes(Collections.singletonList(0)) |
| ); |
| |
| List<OffsetFetchResponseData.OffsetFetchResponseTopics> expectedResponse = Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkInvalidOffsetPartitionResponse(0), |
| mkInvalidOffsetPartitionResponse(1) |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Collections.singletonList( |
| mkInvalidOffsetPartitionResponse(0) |
| )) |
| ); |
| |
| assertEquals(expectedResponse, context.fetchOffsets("group", request, Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testFetchOffsetsAtDifferentCommittedOffset() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); |
| |
| assertEquals(0, context.lastWrittenOffset); |
| context.commitOffset("group", "foo", 0, 100L, 1); |
| assertEquals(1, context.lastWrittenOffset); |
| context.commitOffset("group", "foo", 1, 110L, 1); |
| assertEquals(2, context.lastWrittenOffset); |
| context.commitOffset("group", "bar", 0, 200L, 1); |
| assertEquals(3, context.lastWrittenOffset); |
| context.commitOffset("group", "foo", 1, 111L, 2); |
| assertEquals(4, context.lastWrittenOffset); |
| context.commitOffset("group", "bar", 1, 210L, 2); |
| assertEquals(5, context.lastWrittenOffset); |
| |
| // Always use the same request. |
| List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList( |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("foo") |
| .setPartitionIndexes(Arrays.asList(0, 1)), |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("bar") |
| .setPartitionIndexes(Arrays.asList(0, 1)) |
| ); |
| |
| // Fetching with 0 should return all invalid offsets. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkInvalidOffsetPartitionResponse(0), |
| mkInvalidOffsetPartitionResponse(1) |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkInvalidOffsetPartitionResponse(0), |
| mkInvalidOffsetPartitionResponse(1) |
| )) |
| ), context.fetchOffsets("group", request, 0L)); |
| |
| // Fetching with 1 should return data up to offset 1. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkInvalidOffsetPartitionResponse(1) |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkInvalidOffsetPartitionResponse(0), |
| mkInvalidOffsetPartitionResponse(1) |
| )) |
| ), context.fetchOffsets("group", request, 1L)); |
| |
| // Fetching with 2 should return data up to offset 2. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 110L, 1, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkInvalidOffsetPartitionResponse(0), |
| mkInvalidOffsetPartitionResponse(1) |
| )) |
| ), context.fetchOffsets("group", request, 2L)); |
| |
| // Fetching with 3 should return data up to offset 3. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 110L, 1, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 200L, 1, "metadata"), |
| mkInvalidOffsetPartitionResponse(1) |
| )) |
| ), context.fetchOffsets("group", request, 3L)); |
| |
| // Fetching with 4 should return data up to offset 4. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 111L, 2, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 200L, 1, "metadata"), |
| mkInvalidOffsetPartitionResponse(1) |
| )) |
| ), context.fetchOffsets("group", request, 4L)); |
| |
| // Fetching with 5 should return data up to offset 5. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 111L, 2, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 200L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 210L, 2, "metadata") |
| )) |
| ), context.fetchOffsets("group", request, 5L)); |
| |
| // Fetching with Long.MAX_VALUE should return all offsets. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 111L, 2, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 200L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 210L, 2, "metadata") |
| )) |
| ), context.fetchOffsets("group", request, Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testFetchOffsetsWithPendingTransactionalOffsets() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); |
| |
| context.commitOffset("group", "foo", 0, 100L, 1); |
| context.commitOffset("group", "foo", 1, 110L, 1); |
| context.commitOffset("group", "bar", 0, 200L, 1); |
| |
| context.commit(); |
| |
| assertEquals(3, context.lastWrittenOffset); |
| assertEquals(3, context.lastCommittedOffset); |
| |
| context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); |
| context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); |
| // Note that bar-1 does not exist in the initial commits. UNSTABLE_OFFSET_COMMIT errors |
| // must be returned in this case too. |
| context.commitOffset(10L, "group", "bar", 1, 211L, 1, context.time.milliseconds()); |
| |
| // Always use the same request. |
| List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList( |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("foo") |
| .setPartitionIndexes(Arrays.asList(0, 1)), |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("bar") |
| .setPartitionIndexes(Arrays.asList(0, 1)) |
| ); |
| |
| // Fetching offsets with "require stable" (Long.MAX_VALUE) should return the committed offset for |
| // foo-0 and the UNSTABLE_OFFSET_COMMIT error for foo-1, bar-0 and bar-1. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, Errors.UNSTABLE_OFFSET_COMMIT) |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, Errors.UNSTABLE_OFFSET_COMMIT), |
| mkOffsetPartitionResponse(1, Errors.UNSTABLE_OFFSET_COMMIT) |
| )) |
| ), context.fetchOffsets("group", request, Long.MAX_VALUE)); |
| |
| // Fetching offsets without "require stable" (lastCommittedOffset) should return the committed |
| // offset for foo-0, foo-1 and bar-0 and the INVALID_OFFSET for bar-1. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 110L, 1, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 200L, 1, "metadata"), |
| mkInvalidOffsetPartitionResponse(1) |
| )) |
| ), context.fetchOffsets("group", request, context.lastCommittedOffset)); |
| |
| // Commit the ongoing transaction. |
| context.replayEndTransactionMarker(10L, TransactionResult.COMMIT); |
| |
| // Fetching offsets with "require stable" (Long.MAX_VALUE) should not return any errors now. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 111L, 1, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 201L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 211L, 1, "metadata") |
| )) |
| ), context.fetchOffsets("group", request, Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testGenericGroupFetchAllOffsetsWithDeadGroup() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create a dead group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "group", |
| true |
| ); |
| group.transitionTo(ClassicGroupState.DEAD); |
| |
| assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testFetchAllOffsetsWithUnknownGroup() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testFetchAllOffsetsAtDifferentCommittedOffset() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); |
| |
| assertEquals(0, context.lastWrittenOffset); |
| context.commitOffset("group", "foo", 0, 100L, 1); |
| assertEquals(1, context.lastWrittenOffset); |
| context.commitOffset("group", "foo", 1, 110L, 1); |
| assertEquals(2, context.lastWrittenOffset); |
| context.commitOffset("group", "bar", 0, 200L, 1); |
| assertEquals(3, context.lastWrittenOffset); |
| context.commitOffset("group", "foo", 1, 111L, 2); |
| assertEquals(4, context.lastWrittenOffset); |
| context.commitOffset("group", "bar", 1, 210L, 2); |
| assertEquals(5, context.lastWrittenOffset); |
| |
| // Fetching with 0 should no offsets. |
| assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", 0L)); |
| |
| // Fetching with 1 should return data up to offset 1. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata") |
| )) |
| ), context.fetchAllOffsets("group", 1L)); |
| |
| // Fetching with 2 should return data up to offset 2. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 110L, 1, "metadata") |
| )) |
| ), context.fetchAllOffsets("group", 2L)); |
| |
| // Fetching with 3 should return data up to offset 3. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 200L, 1, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 110L, 1, "metadata") |
| )) |
| ), context.fetchAllOffsets("group", 3L)); |
| |
| // Fetching with 4 should return data up to offset 4. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 200L, 1, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 111L, 2, "metadata") |
| )) |
| ), context.fetchAllOffsets("group", 4L)); |
| |
| // Fetching with Long.MAX_VALUE should return all offsets. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 200L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 210L, 2, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 111L, 2, "metadata") |
| )) |
| ), context.fetchAllOffsets("group", Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testFetchAllOffsetsWithPendingTransactionalOffsets() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); |
| |
| context.commitOffset("group", "foo", 0, 100L, 1); |
| context.commitOffset("group", "foo", 1, 110L, 1); |
| context.commitOffset("group", "bar", 0, 200L, 1); |
| |
| context.commit(); |
| |
| assertEquals(3, context.lastWrittenOffset); |
| assertEquals(3, context.lastCommittedOffset); |
| |
| context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); |
| context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); |
| // Note that bar-1 does not exist in the initial commits. The API does not return it at all until |
| // the transaction is committed. |
| context.commitOffset(10L, "group", "bar", 1, 211L, 1, context.time.milliseconds()); |
| |
| // Fetching offsets with "require stable" (Long.MAX_VALUE) should return the committed offset for |
| // foo-0 and the UNSTABLE_OFFSET_COMMIT error for foo-1 and bar-0. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, Errors.UNSTABLE_OFFSET_COMMIT) |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, Errors.UNSTABLE_OFFSET_COMMIT) |
| )) |
| ), context.fetchAllOffsets("group", Long.MAX_VALUE)); |
| |
| // Fetching offsets without "require stable" (lastCommittedOffset) should the committed |
| // offset for the foo-0, foo-1 and bar-0. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 200L, 1, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 110L, 1, "metadata") |
| )) |
| ), context.fetchAllOffsets("group", context.lastCommittedOffset)); |
| |
| // Commit the ongoing transaction. |
| context.replayEndTransactionMarker(10L, TransactionResult.COMMIT); |
| |
| // Fetching offsets with "require stable" (Long.MAX_VALUE) should not return any errors now. |
| assertEquals(Arrays.asList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 201L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 211L, 1, "metadata") |
| )), |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Arrays.asList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata"), |
| mkOffsetPartitionResponse(1, 111L, 1, "metadata") |
| )) |
| ), context.fetchAllOffsets("group", Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetFetchWithMemberIdAndEpoch() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| // Create consumer group. |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); |
| // Create member. |
| group.updateMember(new ConsumerGroupMember.Builder("member").build()); |
| // Commit offset. |
| context.commitOffset("group", "foo", 0, 100L, 1); |
| |
| // Fetch offsets case. |
| List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList( |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("foo") |
| .setPartitionIndexes(Collections.singletonList(0)) |
| ); |
| |
| assertEquals(Collections.singletonList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Collections.singletonList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata") |
| )) |
| ), context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE)); |
| |
| // Fetch all offsets case. |
| assertEquals(Collections.singletonList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Collections.singletonList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata") |
| )) |
| ), context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetFetchFromAdminClient() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| // Create consumer group. |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); |
| // Create member. |
| group.getOrMaybeCreateMember("member", true); |
| // Commit offset. |
| context.commitOffset("group", "foo", 0, 100L, 1); |
| |
| // Fetch offsets case. |
| List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList( |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("foo") |
| .setPartitionIndexes(Collections.singletonList(0)) |
| ); |
| |
| assertEquals(Collections.singletonList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Collections.singletonList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata") |
| )) |
| ), context.fetchOffsets("group", topics, Long.MAX_VALUE)); |
| |
| // Fetch all offsets case. |
| assertEquals(Collections.singletonList( |
| new OffsetFetchResponseData.OffsetFetchResponseTopics() |
| .setName("foo") |
| .setPartitions(Collections.singletonList( |
| mkOffsetPartitionResponse(0, 100L, 1, "metadata") |
| )) |
| ), context.fetchAllOffsets("group", Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetFetchWithUnknownMemberId() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); |
| |
| // Fetch offsets case. |
| List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList( |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("foo") |
| .setPartitionIndexes(Collections.singletonList(0)) |
| ); |
| |
| // Fetch offsets cases. |
| assertThrows(UnknownMemberIdException.class, |
| () -> context.fetchOffsets("group", "", 0, topics, Long.MAX_VALUE)); |
| assertThrows(UnknownMemberIdException.class, |
| () -> context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE)); |
| |
| // Fetch all offsets cases. |
| assertThrows(UnknownMemberIdException.class, |
| () -> context.fetchAllOffsets("group", "", 0, Long.MAX_VALUE)); |
| assertThrows(UnknownMemberIdException.class, |
| () -> context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); |
| group.updateMember(new ConsumerGroupMember.Builder("member").build()); |
| |
| // Fetch offsets case. |
| List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList( |
| new OffsetFetchRequestData.OffsetFetchRequestTopics() |
| .setName("foo") |
| .setPartitionIndexes(Collections.singletonList(0)) |
| ); |
| |
| // Fetch offsets case. |
| assertThrows(StaleMemberEpochException.class, |
| () -> context.fetchOffsets("group", "member", 10, topics, Long.MAX_VALUE)); |
| |
| // Fetch all offsets case. |
| assertThrows(StaleMemberEpochException.class, |
| () -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetDelete() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| context.commitOffset("foo", "bar", 0, 100L, 0); |
| group.setSubscribedTopics(Optional.of(Collections.emptySet())); |
| context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE); |
| assertFalse(context.hasOffset("foo", "bar", 0)); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetDeleteWithErrors() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| group.setSubscribedTopics(Optional.of(Collections.singleton("bar"))); |
| context.commitOffset("foo", "bar", 0, 100L, 0); |
| |
| // Delete the offset whose topic partition doesn't exist. |
| context.testOffsetDeleteWith("foo", "bar1", 0, Errors.NONE); |
| // Delete the offset from the topic that the group is subscribed to. |
| context.testOffsetDeleteWith("foo", "bar", 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC); |
| } |
| |
| @Test |
| public void testGenericGroupOffsetDeleteWithPendingTransactionalOffsets() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| context.commitOffset(10L, "foo", "bar", 0, 100L, 0, context.time.milliseconds()); |
| group.setSubscribedTopics(Optional.of(Collections.emptySet())); |
| context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE); |
| assertFalse(context.hasOffset("foo", "bar", 0)); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetDelete() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| context.commitOffset("foo", "bar", 0, 100L, 0); |
| assertFalse(group.isSubscribedToTopic("bar")); |
| context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetDeleteWithErrors() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| MetadataImage image = new MetadataImageBuilder() |
| .addTopic(Uuid.randomUuid(), "foo", 1) |
| .addRacks() |
| .build(); |
| ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1") |
| .setSubscribedTopicNames(Collections.singletonList("bar")) |
| .build(); |
| group.computeSubscriptionMetadata( |
| null, |
| member1, |
| image.topics(), |
| image.cluster() |
| ); |
| group.updateMember(member1); |
| context.commitOffset("foo", "bar", 0, 100L, 0); |
| assertTrue(group.isSubscribedToTopic("bar")); |
| |
| // Delete the offset whose topic partition doesn't exist. |
| context.testOffsetDeleteWith("foo", "bar1", 0, Errors.NONE); |
| // Delete the offset from the topic that the group is subscribed to. |
| context.testOffsetDeleteWith("foo", "bar", 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC); |
| } |
| |
| @Test |
| public void testConsumerGroupOffsetDeleteWithPendingTransactionalOffsets() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( |
| "foo", |
| true |
| ); |
| context.commitOffset(10L, "foo", "bar", 0, 100L, 0, context.time.milliseconds()); |
| assertFalse(group.isSubscribedToTopic("bar")); |
| context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE); |
| assertFalse(context.hasOffset("foo", "bar", 0)); |
| } |
| |
| @ParameterizedTest |
| @EnumSource(value = Group.GroupType.class, names = {"CLASSIC", "CONSUMER"}) |
| public void testDeleteGroupAllOffsets(Group.GroupType groupType) { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| context.getOrMaybeCreateGroup(groupType, "foo"); |
| |
| context.commitOffset("foo", "bar-0", 0, 100L, 0); |
| context.commitOffset("foo", "bar-0", 1, 100L, 0); |
| context.commitOffset("foo", "bar-1", 0, 100L, 0); |
| |
| List<Record> expectedRecords = Arrays.asList( |
| RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-1", 0), |
| RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 0), |
| RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 1) |
| ); |
| |
| List<Record> records = new ArrayList<>(); |
| int numDeleteOffsets = context.deleteAllOffsets("foo", records); |
| |
| assertEquals(expectedRecords, records); |
| assertEquals(3, numDeleteOffsets); |
| } |
| |
| @ParameterizedTest |
| @EnumSource(value = Group.GroupType.class, names = {"CLASSIC", "CONSUMER"}) |
| public void testDeleteGroupAllOffsetsWithPendingTransactionalOffsets(Group.GroupType groupType) { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| context.getOrMaybeCreateGroup(groupType, "foo"); |
| |
| context.commitOffset("foo", "bar-0", 0, 100L, 0); |
| context.commitOffset("foo", "bar-0", 1, 100L, 0); |
| context.commitOffset("foo", "bar-1", 0, 100L, 0); |
| |
| context.commitOffset(10L, "foo", "bar-1", 0, 101L, 0, context.time.milliseconds()); |
| context.commitOffset(10L, "foo", "bar-2", 0, 100L, 0, context.time.milliseconds()); |
| |
| List<Record> expectedRecords = Arrays.asList( |
| RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-1", 0), |
| RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 0), |
| RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 1), |
| RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-2", 0) |
| ); |
| |
| List<Record> records = new ArrayList<>(); |
| int numDeleteOffsets = context.deleteAllOffsets("foo", records); |
| |
| assertEquals(expectedRecords, records); |
| assertEquals(4, numDeleteOffsets); |
| |
| assertFalse(context.hasOffset("foo", "bar-0", 0)); |
| assertFalse(context.hasOffset("foo", "bar-0", 1)); |
| assertFalse(context.hasOffset("foo", "bar-1", 0)); |
| assertFalse(context.hasOffset("foo", "bar-2", 0)); |
| } |
| |
| @Test |
| public void testCleanupExpiredOffsetsGroupHasNoOffsets() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() |
| .build(); |
| |
| List<Record> records = new ArrayList<>(); |
| assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records)); |
| assertEquals(Collections.emptyList(), records); |
| } |
| |
| @Test |
| public void testCleanupExpiredOffsetsGroupDoesNotExist() { |
| GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() |
| .withGroupMetadataManager(groupMetadataManager) |
| .build(); |
| |
| when(groupMetadataManager.group("unknown-group-id")).thenThrow(GroupIdNotFoundException.class); |
| context.commitOffset("unknown-group-id", "topic", 0, 100L, 0); |
| assertThrows(GroupIdNotFoundException.class, () -> context.cleanupExpiredOffsets("unknown-group-id", new ArrayList<>())); |
| } |
| |
| @Test |
| public void testCleanupExpiredOffsetsEmptyOffsetExpirationCondition() { |
| GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); |
| Group group = mock(Group.class); |
| |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() |
| .withGroupMetadataManager(groupMetadataManager) |
| .build(); |
| |
| context.commitOffset("group-id", "topic", 0, 100L, 0); |
| |
| when(groupMetadataManager.group("group-id")).thenReturn(group); |
| when(group.offsetExpirationCondition()).thenReturn(Optional.empty()); |
| |
| List<Record> records = new ArrayList<>(); |
| assertFalse(context.cleanupExpiredOffsets("group-id", records)); |
| assertEquals(Collections.emptyList(), records); |
| } |
| |
| @Test |
| public void testCleanupExpiredOffsets() { |
| GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); |
| Group group = mock(Group.class); |
| |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() |
| .withGroupMetadataManager(groupMetadataManager) |
| .withOffsetsRetentionMs(1000) |
| .build(); |
| |
| long commitTimestamp = context.time.milliseconds(); |
| |
| context.commitOffset("group-id", "firstTopic", 0, 100L, 0, commitTimestamp); |
| context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp); |
| context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500); |
| |
| context.time.sleep(1000); |
| |
| // firstTopic-0: group is still subscribed to firstTopic. Do not expire. |
| // secondTopic-0: should expire as offset retention has passed. |
| // secondTopic-1: has not passed offset retention. Do not expire. |
| List<Record> expectedRecords = Collections.singletonList( |
| RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "secondTopic", 0) |
| ); |
| |
| when(groupMetadataManager.group("group-id")).thenReturn(group); |
| when(group.offsetExpirationCondition()).thenReturn(Optional.of( |
| new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs))); |
| when(group.isSubscribedToTopic("firstTopic")).thenReturn(true); |
| when(group.isSubscribedToTopic("secondTopic")).thenReturn(false); |
| |
| List<Record> records = new ArrayList<>(); |
| assertFalse(context.cleanupExpiredOffsets("group-id", records)); |
| assertEquals(expectedRecords, records); |
| |
| // Expire secondTopic-1. |
| context.time.sleep(500); |
| expectedRecords = Collections.singletonList( |
| RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "secondTopic", 1) |
| ); |
| |
| records = new ArrayList<>(); |
| assertFalse(context.cleanupExpiredOffsets("group-id", records)); |
| assertEquals(expectedRecords, records); |
| |
| // Add 2 more commits, then expire all. |
| when(group.isSubscribedToTopic("firstTopic")).thenReturn(false); |
| context.commitOffset("group-id", "firstTopic", 1, 100L, 0, commitTimestamp + 500); |
| context.commitOffset("group-id", "secondTopic", 0, 101L, 0, commitTimestamp + 500); |
| |
| expectedRecords = Arrays.asList( |
| RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "firstTopic", 0), |
| RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "firstTopic", 1), |
| RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "secondTopic", 0) |
| ); |
| |
| records = new ArrayList<>(); |
| assertTrue(context.cleanupExpiredOffsets("group-id", records)); |
| assertEquals(expectedRecords, records); |
| } |
| |
| @Test |
| public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() { |
| GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); |
| Group group = mock(Group.class); |
| |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() |
| .withGroupMetadataManager(groupMetadataManager) |
| .withOffsetsRetentionMs(1000) |
| .build(); |
| |
| long commitTimestamp = context.time.milliseconds(); |
| |
| context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp); |
| context.commitOffset(10L, "group-id", "foo", 0, 101L, 0, commitTimestamp + 500); |
| |
| context.time.sleep(1000); |
| |
| when(groupMetadataManager.group("group-id")).thenReturn(group); |
| when(group.offsetExpirationCondition()).thenReturn(Optional.of( |
| new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs))); |
| when(group.isSubscribedToTopic("foo")).thenReturn(false); |
| |
| // foo-0 should not be expired because it has a pending transactional offset commit. |
| List<Record> records = new ArrayList<>(); |
| assertFalse(context.cleanupExpiredOffsets("group-id", records)); |
| assertEquals(Collections.emptyList(), records); |
| } |
| |
| static private OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse( |
| int partition, |
| long offset, |
| int leaderEpoch, |
| String metadata |
| ) { |
| return new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(partition) |
| .setCommittedOffset(offset) |
| .setCommittedLeaderEpoch(leaderEpoch) |
| .setMetadata(metadata); |
| } |
| |
| static private OffsetFetchResponseData.OffsetFetchResponsePartitions mkInvalidOffsetPartitionResponse(int partition) { |
| return new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(partition) |
| .setCommittedOffset(INVALID_OFFSET) |
| .setCommittedLeaderEpoch(-1) |
| .setMetadata(""); |
| } |
| |
| static private OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(int partition, Errors error) { |
| return new OffsetFetchResponseData.OffsetFetchResponsePartitions() |
| .setPartitionIndex(partition) |
| .setErrorCode(error.code()) |
| .setCommittedOffset(INVALID_OFFSET) |
| .setCommittedLeaderEpoch(-1) |
| .setMetadata(""); |
| } |
| |
| @Test |
| public void testReplay() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata( |
| 0L, |
| 100L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata( |
| 1L, |
| 200L, |
| OptionalInt.of(10), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata( |
| 2L, |
| 200L, |
| OptionalInt.of(10), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata( |
| 3L, |
| 300L, |
| OptionalInt.of(10), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.of(12345L) |
| )); |
| } |
| |
| @Test |
| public void testTransactionalReplay() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| verifyTransactionalReplay(context, 5, "foo", "bar", 0, new OffsetAndMetadata( |
| 0L, |
| 100L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| verifyTransactionalReplay(context, 5, "foo", "bar", 1, new OffsetAndMetadata( |
| 1L, |
| 101L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| verifyTransactionalReplay(context, 5, "bar", "zar", 0, new OffsetAndMetadata( |
| 2L, |
| 100L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| verifyTransactionalReplay(context, 5, "bar", "zar", 1, new OffsetAndMetadata( |
| 3L, |
| 101L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| verifyTransactionalReplay(context, 6, "foo", "bar", 2, new OffsetAndMetadata( |
| 4L, |
| 102L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| verifyTransactionalReplay(context, 6, "foo", "bar", 3, new OffsetAndMetadata( |
| 5L, |
| 102L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| } |
| |
| @Test |
| public void testReplayWithTombstoneAndPendingTransactionalOffsets() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Add the offsets. |
| verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata( |
| 0L, |
| 100L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| verifyTransactionalReplay(context, 10L, "foo", "bar", 0, new OffsetAndMetadata( |
| 1L, |
| 100L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| verifyTransactionalReplay(context, 10L, "foo", "bar", 1, new OffsetAndMetadata( |
| 2L, |
| 100L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| // Delete the offsets. |
| context.replay(RecordHelpers.newOffsetCommitTombstoneRecord( |
| "foo", |
| "bar", |
| 0 |
| )); |
| |
| context.replay(RecordHelpers.newOffsetCommitTombstoneRecord( |
| "foo", |
| "bar", |
| 1 |
| )); |
| |
| // Verify that the offset is gone. |
| assertFalse(context.hasOffset("foo", "bar", 0)); |
| assertFalse(context.hasOffset("foo", "bar", 1)); |
| } |
| |
| @Test |
| public void testReplayTransactionEndMarkerWithCommit() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Add regular offset commit. |
| verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata( |
| 0L, |
| 99L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| // Add pending transactional commit for producer id 5. |
| verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new OffsetAndMetadata( |
| 1L, |
| 100L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| // Add pending transactional commit for producer id 6. |
| verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new OffsetAndMetadata( |
| 2L, |
| 200L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| // Replaying an end marker with an unknown producer id should not fail. |
| context.replayEndTransactionMarker(1L, TransactionResult.COMMIT); |
| |
| // Replaying an end marker to commit transaction of producer id 5. |
| context.replayEndTransactionMarker(5L, TransactionResult.COMMIT); |
| |
| // The pending offset is removed... |
| assertNull(context.offsetMetadataManager.pendingTransactionalOffset( |
| 5L, |
| "foo", |
| "bar", |
| 0 |
| )); |
| |
| // ... and added to the main offset storage. |
| assertEquals(new OffsetAndMetadata( |
| 1L, |
| 100L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| ), context.offsetMetadataManager.offset( |
| "foo", |
| "bar", |
| 0 |
| )); |
| |
| // Replaying an end marker to abort transaction of producer id 6. |
| context.replayEndTransactionMarker(6L, TransactionResult.ABORT); |
| |
| // The pending offset is removed from the pending offsets and |
| // it is not added to the main offset storage. |
| assertNull(context.offsetMetadataManager.pendingTransactionalOffset( |
| 6L, |
| "foo", |
| "bar", |
| 1 |
| )); |
| assertNull(context.offsetMetadataManager.offset( |
| "foo", |
| "bar", |
| 1 |
| )); |
| } |
| |
| @Test |
| public void testReplayTransactionEndMarkerKeepsTheMostRecentCommittedOffset() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Add pending transactional offset commit for producer id 5. |
| verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new OffsetAndMetadata( |
| 0L, |
| 100L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| // Add regular offset commit. |
| verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata( |
| 1L, |
| 101L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| // Replaying an end marker to commit transaction of producer id 5. |
| context.replayEndTransactionMarker(5L, TransactionResult.COMMIT); |
| |
| // The pending offset is removed... |
| assertNull(context.offsetMetadataManager.pendingTransactionalOffset( |
| 5L, |
| "foo", |
| "bar", |
| 0 |
| )); |
| |
| // ... but it is not added to the main storage because the regular |
| // committed offset is more recent. |
| assertEquals(new OffsetAndMetadata( |
| 1L, |
| 101L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| ), context.offsetMetadataManager.offset( |
| "foo", |
| "bar", |
| 0 |
| )); |
| } |
| |
| @Test |
| public void testOffsetCommitsNumberMetricWithTransactionalOffsets() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Add pending transactional commit for producer id 4. |
| verifyTransactionalReplay(context, 4L, "foo", "bar", 0, new OffsetAndMetadata( |
| 0L, |
| 100L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| // Add pending transactional commit for producer id 5. |
| verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new OffsetAndMetadata( |
| 1L, |
| 101L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| // Add pending transactional commit for producer id 6. |
| verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new OffsetAndMetadata( |
| 2L, |
| 200L, |
| OptionalInt.empty(), |
| "small", |
| context.time.milliseconds(), |
| OptionalLong.empty() |
| )); |
| |
| // Commit all the transactions. |
| context.replayEndTransactionMarker(4L, TransactionResult.COMMIT); |
| context.replayEndTransactionMarker(5L, TransactionResult.COMMIT); |
| context.replayEndTransactionMarker(6L, TransactionResult.COMMIT); |
| |
| // Verify the sensor is called twice as we have only |
| // two partitions. |
| verify(context.metrics, times(2)).incrementNumOffsets(); |
| } |
| |
| @Test |
| public void testOffsetCommitsSensor() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Create an empty group. |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup( |
| "foo", |
| true |
| ); |
| |
| // Add member. |
| group.add(mkGenericMember("member", Optional.of("new-instance-id"))); |
| |
| // Transition to next generation. |
| group.transitionTo(ClassicGroupState.PREPARING_REBALANCE); |
| group.initNextGeneration(); |
| assertEquals(1, group.generationId()); |
| group.transitionTo(ClassicGroupState.STABLE); |
| |
| CoordinatorResult<OffsetCommitResponseData, Record> result = context.commitOffset( |
| new OffsetCommitRequestData() |
| .setGroupId("foo") |
| .setMemberId("member") |
| .setGenerationIdOrMemberEpoch(1) |
| .setRetentionTimeMs(1234L) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(0) |
| .setCommittedOffset(100L), |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(1) |
| .setCommittedOffset(150L) |
| )) |
| )) |
| ); |
| |
| verify(context.metrics).record(OFFSET_COMMITS_SENSOR_NAME, 2); |
| } |
| |
| @Test |
| public void testOffsetsExpiredSensor() { |
| GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); |
| Group group = mock(Group.class); |
| |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() |
| .withGroupMetadataManager(groupMetadataManager) |
| .withOffsetsRetentionMs(1000) |
| .build(); |
| |
| long commitTimestamp = context.time.milliseconds(); |
| |
| context.commitOffset("group-id", "firstTopic", 0, 100L, 0, commitTimestamp); |
| context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp); |
| context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500); |
| |
| context.time.sleep(1000); |
| |
| // firstTopic-0: group is still subscribed to firstTopic. Do not expire. |
| // secondTopic-0: should expire as offset retention has passed. |
| // secondTopic-1: has not passed offset retention. Do not expire. |
| List<Record> expectedRecords = Collections.singletonList( |
| RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "secondTopic", 0) |
| ); |
| |
| when(groupMetadataManager.group("group-id")).thenReturn(group); |
| when(group.offsetExpirationCondition()).thenReturn(Optional.of( |
| new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs))); |
| when(group.isSubscribedToTopic("firstTopic")).thenReturn(true); |
| when(group.isSubscribedToTopic("secondTopic")).thenReturn(false); |
| |
| List<Record> records = new ArrayList<>(); |
| assertFalse(context.cleanupExpiredOffsets("group-id", records)); |
| assertEquals(expectedRecords, records); |
| |
| // Expire secondTopic-1. |
| context.time.sleep(500); |
| |
| records = new ArrayList<>(); |
| assertFalse(context.cleanupExpiredOffsets("group-id", records)); |
| verify(context.metrics, times(2)).record(OFFSET_EXPIRED_SENSOR_NAME, 1); |
| |
| // Add 2 more commits, then expire all. |
| when(group.isSubscribedToTopic("firstTopic")).thenReturn(false); |
| context.commitOffset("group-id", "firstTopic", 1, 100L, 0, commitTimestamp + 500); |
| context.commitOffset("group-id", "secondTopic", 0, 101L, 0, commitTimestamp + 500); |
| |
| records = new ArrayList<>(); |
| assertTrue(context.cleanupExpiredOffsets("group-id", records)); |
| verify(context.metrics).record(OFFSET_EXPIRED_SENSOR_NAME, 3); |
| } |
| |
| @Test |
| public void testOffsetDeletionsSensor() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true); |
| |
| context.commitOffset("foo", "bar", 0, 100L, 0); |
| context.commitOffset("foo", "bar", 1, 150L, 0); |
| group.setSubscribedTopics(Optional.of(Collections.emptySet())); |
| |
| OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = |
| new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( |
| new OffsetDeleteRequestData.OffsetDeleteRequestTopic() |
| .setName("bar") |
| .setPartitions(Arrays.asList( |
| new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0), |
| new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(1) |
| )) |
| ).iterator()); |
| |
| context.deleteOffsets( |
| new OffsetDeleteRequestData() |
| .setGroupId("foo") |
| .setTopics(requestTopicCollection) |
| ); |
| |
| verify(context.metrics).record(OFFSET_DELETIONS_SENSOR_NAME, 2); |
| } |
| |
| @Test |
| public void testOnPartitionsDeleted() { |
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); |
| |
| // Commit offsets. |
| context.commitOffset("grp-0", "foo", 1, 100, 1, context.time.milliseconds()); |
| context.commitOffset("grp-0", "foo", 2, 200, 1, context.time.milliseconds()); |
| context.commitOffset("grp-0", "foo", 3, 300, 1, context.time.milliseconds()); |
| |
| context.commitOffset("grp-1", "bar", 1, 100, 1, context.time.milliseconds()); |
| context.commitOffset("grp-1", "bar", 2, 200, 1, context.time.milliseconds()); |
| context.commitOffset("grp-1", "bar", 3, 300, 1, context.time.milliseconds()); |
| |
| context.commitOffset(100L, "grp-2", "foo", 1, 100, 1, context.time.milliseconds()); |
| context.commitOffset(100L, "grp-2", "foo", 2, 200, 1, context.time.milliseconds()); |
| context.commitOffset(100L, "grp-2", "foo", 3, 300, 1, context.time.milliseconds()); |
| |
| // Delete partitions. |
| List<Record> records = context.deletePartitions(Arrays.asList( |
| new TopicPartition("foo", 1), |
| new TopicPartition("foo", 2), |
| new TopicPartition("foo", 3), |
| new TopicPartition("bar", 1) |
| )); |
| |
| // Verify. |
| List<Record> expectedRecords = Arrays.asList( |
| RecordHelpers.newOffsetCommitTombstoneRecord("grp-0", "foo", 1), |
| RecordHelpers.newOffsetCommitTombstoneRecord("grp-0", "foo", 2), |
| RecordHelpers.newOffsetCommitTombstoneRecord("grp-0", "foo", 3), |
| RecordHelpers.newOffsetCommitTombstoneRecord("grp-1", "bar", 1), |
| RecordHelpers.newOffsetCommitTombstoneRecord("grp-2", "foo", 1), |
| RecordHelpers.newOffsetCommitTombstoneRecord("grp-2", "foo", 2), |
| RecordHelpers.newOffsetCommitTombstoneRecord("grp-2", "foo", 3) |
| ); |
| |
| assertEquals(new HashSet<>(expectedRecords), new HashSet<>(records)); |
| |
| assertFalse(context.hasOffset("grp-0", "foo", 1)); |
| assertFalse(context.hasOffset("grp-0", "foo", 2)); |
| assertFalse(context.hasOffset("grp-0", "foo", 3)); |
| assertFalse(context.hasOffset("grp-1", "bar", 1)); |
| assertFalse(context.hasOffset("grp-2", "foo", 1)); |
| assertFalse(context.hasOffset("grp-2", "foo", 2)); |
| assertFalse(context.hasOffset("grp-2", "foo", 3)); |
| } |
| |
| private void verifyReplay( |
| OffsetMetadataManagerTestContext context, |
| String groupId, |
| String topic, |
| int partition, |
| OffsetAndMetadata offsetAndMetadata |
| ) { |
| context.replay(RecordHelpers.newOffsetCommitRecord( |
| groupId, |
| topic, |
| partition, |
| offsetAndMetadata, |
| MetadataImage.EMPTY.features().metadataVersion() |
| )); |
| |
| assertEquals(offsetAndMetadata, context.offsetMetadataManager.offset( |
| groupId, |
| topic, |
| partition |
| )); |
| } |
| |
| private void verifyTransactionalReplay( |
| OffsetMetadataManagerTestContext context, |
| long producerId, |
| String groupId, |
| String topic, |
| int partition, |
| OffsetAndMetadata offsetAndMetadata |
| ) { |
| context.replay(producerId, RecordHelpers.newOffsetCommitRecord( |
| groupId, |
| topic, |
| partition, |
| offsetAndMetadata, |
| MetadataImage.EMPTY.features().metadataVersion() |
| )); |
| |
| assertEquals(offsetAndMetadata, context.offsetMetadataManager.pendingTransactionalOffset( |
| producerId, |
| groupId, |
| topic, |
| partition |
| )); |
| } |
| |
| private ClassicGroupMember mkGenericMember( |
| String memberId, |
| Optional<String> groupInstanceId |
| ) { |
| return new ClassicGroupMember( |
| memberId, |
| groupInstanceId, |
| "client-id", |
| "host", |
| 5000, |
| 5000, |
| "consumer", |
| new JoinGroupRequestData.JoinGroupRequestProtocolCollection( |
| Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol() |
| .setName("range") |
| .setMetadata(new byte[0]) |
| ).iterator() |
| ) |
| ); |
| } |
| } |