blob: 20b4428fd90d8fb62a62d17ac6d04836994f0bb5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.RecordHelpers;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineInteger;
import org.apache.kafka.timeline.TimelineObject;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.ASSIGNING;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.RECONCILING;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.STABLE;
/**
* A Consumer Group. All the metadata in this class are backed by
* records in the __consumer_offsets partitions.
*/
public class ConsumerGroup implements Group {
public enum ConsumerGroupState {
EMPTY("Empty"),
ASSIGNING("Assigning"),
RECONCILING("Reconciling"),
STABLE("Stable"),
DEAD("Dead");
private final String name;
private final String lowerCaseName;
ConsumerGroupState(String name) {
this.name = name;
this.lowerCaseName = name.toLowerCase(Locale.ROOT);
}
@Override
public String toString() {
return name;
}
public String toLowerCaseString() {
return lowerCaseName;
}
}
public static class DeadlineAndEpoch {
static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
public final long deadlineMs;
public final int epoch;
DeadlineAndEpoch(long deadlineMs, int epoch) {
this.deadlineMs = deadlineMs;
this.epoch = epoch;
}
}
/**
* The snapshot registry.
*/
private final SnapshotRegistry snapshotRegistry;
/**
* The group id.
*/
private final String groupId;
/**
* The group state.
*/
private final TimelineObject<ConsumerGroupState> state;
/**
* The group epoch. The epoch is incremented whenever the subscriptions
* are updated and it will trigger the computation of a new assignment
* for the group.
*/
private final TimelineInteger groupEpoch;
/**
* The group members.
*/
private final TimelineHashMap<String, ConsumerGroupMember> members;
/**
* The static group members.
*/
private final TimelineHashMap<String, String> staticMembers;
/**
* The number of members supporting each server assignor name.
*/
private final TimelineHashMap<String, Integer> serverAssignors;
/**
* The number of subscribers per topic.
*/
private final TimelineHashMap<String, Integer> subscribedTopicNames;
/**
* The metadata associated with each subscribed topic name.
*/
private final TimelineHashMap<String, TopicMetadata> subscribedTopicMetadata;
/**
* The target assignment epoch. An assignment epoch smaller than the group epoch
* means that a new assignment is required. The assignment epoch is updated when
* a new assignment is installed.
*/
private final TimelineInteger targetAssignmentEpoch;
/**
* The target assignment per member id.
*/
private final TimelineHashMap<String, Assignment> targetAssignment;
/**
* The current partition epoch maps each topic-partitions to their current epoch where
* the epoch is the epoch of their owners. When a member revokes a partition, it removes
* its epochs from this map. When a member gets a partition, it adds its epochs to this map.
*/
private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
/**
* The coordinator metrics.
*/
private final GroupCoordinatorMetricsShard metrics;
/**
* The metadata refresh deadline. It consists of a timestamp in milliseconds together with
* the group epoch at the time of setting it. The metadata refresh time is considered as a
* soft state (read that it is not stored in a timeline data structure). It is like this
* because it is not persisted to the log. The group epoch is here to ensure that the
* metadata refresh deadline is invalidated if the group epoch does not correspond to
* the current group epoch. This can happen if the metadata refresh deadline is updated
* after having refreshed the metadata but the write operation failed. In this case, the
* time is not automatically rolled back.
*/
private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
/**
* The number of members that use the classic protocol.
*/
private final TimelineInteger numClassicProtocolMembers;
/**
* Map of protocol names to the number of members that use classic protocol and support them.
*/
private final TimelineHashMap<String, Integer> classicProtocolMembersSupportedProtocols;
public ConsumerGroup(
SnapshotRegistry snapshotRegistry,
String groupId,
GroupCoordinatorMetricsShard metrics
) {
this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
this.groupId = Objects.requireNonNull(groupId);
this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
this.groupEpoch = new TimelineInteger(snapshotRegistry);
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
this.metrics = Objects.requireNonNull(metrics);
this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
this.classicProtocolMembersSupportedProtocols = new TimelineHashMap<>(snapshotRegistry, 0);
}
/**
* @return The group type (Consumer).
*/
@Override
public GroupType type() {
return GroupType.CONSUMER;
}
/**
* @return The current state as a String.
*/
@Override
public String stateAsString() {
return state.get().toString();
}
/**
* @return The current state as a String with given committedOffset.
*/
public String stateAsString(long committedOffset) {
return state.get(committedOffset).toString();
}
/**
* @return the group formatted as a list group response based on the committed offset.
*/
public ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset) {
return new ListGroupsResponseData.ListedGroup()
.setGroupId(groupId)
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(state.get(committedOffset).toString())
.setGroupType(type().toString());
}
/**
* @return The group id.
*/
@Override
public String groupId() {
return groupId;
}
/**
* @return The current state.
*/
public ConsumerGroupState state() {
return state.get();
}
/**
* @return The current state based on committed offset.
*/
public ConsumerGroupState state(long committedOffset) {
return state.get(committedOffset);
}
/**
* @return The group epoch.
*/
public int groupEpoch() {
return groupEpoch.get();
}
/**
* Sets the group epoch.
*
* @param groupEpoch The new group epoch.
*/
public void setGroupEpoch(int groupEpoch) {
this.groupEpoch.set(groupEpoch);
maybeUpdateGroupState();
}
/**
* @return The target assignment epoch.
*/
public int assignmentEpoch() {
return targetAssignmentEpoch.get();
}
/**
* Sets the assignment epoch.
*
* @param targetAssignmentEpoch The new assignment epoch.
*/
public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
this.targetAssignmentEpoch.set(targetAssignmentEpoch);
maybeUpdateGroupState();
}
/**
* Sets the number of members using the classic protocol.
*
* @param numClassicProtocolMembers The new NumClassicProtocolMembers.
*/
public void setNumClassicProtocolMembers(int numClassicProtocolMembers) {
this.numClassicProtocolMembers.set(numClassicProtocolMembers);
}
/**
* Get member id of a static member that matches the given group
* instance id.
*
* @param groupInstanceId The group instance id.
*
* @return The member id corresponding to the given instance id or null if it does not exist
*/
public String staticMemberId(String groupInstanceId) {
return staticMembers.get(groupInstanceId);
}
/**
* Gets or creates a new member but without adding it to the group. Adding a member
* is done via the {@link ConsumerGroup#updateMember(ConsumerGroupMember)} method.
*
* @param memberId The member id.
* @param createIfNotExists Booleans indicating whether the member must be
* created if it does not exist.
*
* @return A ConsumerGroupMember.
*/
public ConsumerGroupMember getOrMaybeCreateMember(
String memberId,
boolean createIfNotExists
) {
ConsumerGroupMember member = members.get(memberId);
if (member != null) return member;
if (!createIfNotExists) {
throw new UnknownMemberIdException(
String.format("Member %s is not a member of group %s.", memberId, groupId)
);
}
return new ConsumerGroupMember.Builder(memberId).build();
}
/**
* Gets a static member.
*
* @param instanceId The group instance id.
*
* @return The member corresponding to the given instance id or null if it does not exist
*/
public ConsumerGroupMember staticMember(String instanceId) {
String existingMemberId = staticMemberId(instanceId);
return existingMemberId == null ? null : getOrMaybeCreateMember(existingMemberId, false);
}
/**
* Adds or updates the member.
*
* @param newMember The new member state.
*/
public void updateMember(ConsumerGroupMember newMember) {
if (newMember == null) {
throw new IllegalArgumentException("newMember cannot be null.");
}
ConsumerGroupMember oldMember = members.put(newMember.memberId(), newMember);
maybeUpdateSubscribedTopicNames(oldMember, newMember);
maybeUpdateServerAssignors(oldMember, newMember);
maybeUpdatePartitionEpoch(oldMember, newMember);
updateStaticMember(newMember);
maybeUpdateGroupState();
maybeUpdateNumClassicProtocolMembers(oldMember, newMember);
maybeUpdateClassicProtocolMembersSupportedProtocols(oldMember, newMember);
}
/**
* Updates the member id stored against the instance id if the member is a static member.
*
* @param newMember The new member state.
*/
private void updateStaticMember(ConsumerGroupMember newMember) {
if (newMember.instanceId() != null) {
staticMembers.put(newMember.instanceId(), newMember.memberId());
}
}
/**
* Remove the member from the group.
*
* @param memberId The member id to remove.
*/
public void removeMember(String memberId) {
ConsumerGroupMember oldMember = members.remove(memberId);
maybeUpdateSubscribedTopicNames(oldMember, null);
maybeUpdateServerAssignors(oldMember, null);
maybeRemovePartitionEpoch(oldMember);
removeStaticMember(oldMember);
maybeUpdateGroupState();
maybeUpdateNumClassicProtocolMembers(oldMember, null);
maybeUpdateClassicProtocolMembersSupportedProtocols(oldMember, null);
}
/**
* Remove the static member mapping if the removed member is static.
*
* @param oldMember The member to remove.
*/
private void removeStaticMember(ConsumerGroupMember oldMember) {
if (oldMember.instanceId() != null) {
staticMembers.remove(oldMember.instanceId());
}
}
/**
* Returns true if the member exists.
*
* @param memberId The member id.
*
* @return A boolean indicating whether the member exists or not.
*/
public boolean hasMember(String memberId) {
return members.containsKey(memberId);
}
/**
* @return The number of members.
*/
public int numMembers() {
return members.size();
}
/**
* @return The number of members that use the classic protocol.
*/
public int numClassicProtocolMembers() {
return numClassicProtocolMembers.get();
}
/**
* @return The map of the protocol name and the number of members using the classic protocol that support it.
*/
public Map<String, Integer> classicMembersSupportedProtocols() {
return Collections.unmodifiableMap(classicProtocolMembersSupportedProtocols);
}
/**
* @return An immutable Map containing all the members keyed by their id.
*/
public Map<String, ConsumerGroupMember> members() {
return Collections.unmodifiableMap(members);
}
/**
* @return An immutable Map containing all the static members keyed by instance id.
*/
public Map<String, String> staticMembers() {
return Collections.unmodifiableMap(staticMembers);
}
/**
* @return An immutable Set containing all the subscribed topic names.
*/
public Set<String> subscribedTopicNames() {
return Collections.unmodifiableSet(subscribedTopicNames.keySet());
}
/**
* Returns true if the consumer group is actively subscribed to the topic.
*
* @param topic The topic name.
*
* @return Whether the group is subscribed to the topic.
*/
@Override
public boolean isSubscribedToTopic(String topic) {
return subscribedTopicNames.containsKey(topic);
}
/**
* Returns the target assignment of the member.
*
* @return The ConsumerGroupMemberAssignment or an EMPTY one if it does not
* exist.
*/
public Assignment targetAssignment(String memberId) {
return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
}
/**
* Updates target assignment of a member.
*
* @param memberId The member id.
* @param newTargetAssignment The new target assignment.
*/
public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) {
targetAssignment.put(memberId, newTargetAssignment);
}
/**
* Removes the target assignment of a member.
*
* @param memberId The member id.
*/
public void removeTargetAssignment(String memberId) {
targetAssignment.remove(memberId);
}
/**
* @return An immutable Map containing all the target assignment keyed by member id.
*/
public Map<String, Assignment> targetAssignment() {
return Collections.unmodifiableMap(targetAssignment);
}
/**
* Returns the current epoch of a partition or -1 if the partition
* does not have one.
*
* @param topicId The topic id.
* @param partitionId The partition id.
*
* @return The epoch or -1.
*/
public int currentPartitionEpoch(
Uuid topicId, int partitionId
) {
Map<Integer, Integer> partitions = currentPartitionEpoch.get(topicId);
if (partitions == null) {
return -1;
} else {
return partitions.getOrDefault(partitionId, -1);
}
}
/**
* Compute the preferred (server side) assignor for the group while
* taking into account the updated member. The computation relies
* on {{@link ConsumerGroup#serverAssignors}} persisted structure
* but it does not update it.
*
* @param oldMember The old member.
* @param newMember The new member.
*
* @return An Optional containing the preferred assignor.
*/
public Optional<String> computePreferredServerAssignor(
ConsumerGroupMember oldMember,
ConsumerGroupMember newMember
) {
// Copy the current count and update it.
Map<String, Integer> counts = new HashMap<>(this.serverAssignors);
maybeUpdateServerAssignors(counts, oldMember, newMember);
return counts.entrySet().stream()
.max(Map.Entry.comparingByValue())
.map(Map.Entry::getKey);
}
/**
* @return The preferred assignor for the group.
*/
public Optional<String> preferredServerAssignor() {
return preferredServerAssignor(Long.MAX_VALUE);
}
/**
* @return The preferred assignor for the group with given offset.
*/
public Optional<String> preferredServerAssignor(long committedOffset) {
return serverAssignors.entrySet(committedOffset).stream()
.max(Map.Entry.comparingByValue())
.map(Map.Entry::getKey);
}
/**
* @return An immutable Map of subscription metadata for
* each topic that the consumer group is subscribed to.
*/
public Map<String, TopicMetadata> subscriptionMetadata() {
return Collections.unmodifiableMap(subscribedTopicMetadata);
}
/**
* Updates the subscription metadata. This replaces the previous one.
*
* @param subscriptionMetadata The new subscription metadata.
*/
public void setSubscriptionMetadata(
Map<String, TopicMetadata> subscriptionMetadata
) {
this.subscribedTopicMetadata.clear();
this.subscribedTopicMetadata.putAll(subscriptionMetadata);
}
/**
* Computes the subscription metadata based on the current subscription and
* an updated member.
*
* @param oldMember The old member of the consumer group.
* @param newMember The updated member of the consumer group.
* @param topicsImage The current metadata for all available topics.
* @param clusterImage The current metadata for the Kafka cluster.
*
* @return An immutable map of subscription metadata for each topic that the consumer group is subscribed to.
*/
public Map<String, TopicMetadata> computeSubscriptionMetadata(
ConsumerGroupMember oldMember,
ConsumerGroupMember newMember,
TopicsImage topicsImage,
ClusterImage clusterImage
) {
// Copy and update the current subscriptions.
Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames);
maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember);
// Create the topic metadata for each subscribed topic.
Map<String, TopicMetadata> newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size());
subscribedTopicNames.forEach((topicName, count) -> {
TopicImage topicImage = topicsImage.getTopic(topicName);
if (topicImage != null) {
Map<Integer, Set<String>> partitionRacks = new HashMap<>();
topicImage.partitions().forEach((partition, partitionRegistration) -> {
Set<String> racks = new HashSet<>();
for (int replica : partitionRegistration.replicas) {
Optional<String> rackOptional = clusterImage.broker(replica).rack();
// Only add the rack if it is available for the broker/replica.
rackOptional.ifPresent(racks::add);
}
// If rack information is unavailable for all replicas of this partition,
// no corresponding entry will be stored for it in the map.
if (!racks.isEmpty())
partitionRacks.put(partition, racks);
});
newSubscriptionMetadata.put(topicName, new TopicMetadata(
topicImage.id(),
topicImage.name(),
topicImage.partitions().size(),
partitionRacks)
);
}
});
return Collections.unmodifiableMap(newSubscriptionMetadata);
}
/**
* Updates the metadata refresh deadline.
*
* @param deadlineMs The deadline in milliseconds.
* @param groupEpoch The associated group epoch.
*/
public void setMetadataRefreshDeadline(
long deadlineMs,
int groupEpoch
) {
this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, groupEpoch);
}
/**
* Requests a metadata refresh.
*/
public void requestMetadataRefresh() {
this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
}
/**
* Checks if a metadata refresh is required. A refresh is required in two cases:
* 1) The deadline is smaller or equal to the current time;
* 2) The group epoch associated with the deadline is larger than
* the current group epoch. This means that the operations which updated
* the deadline failed.
*
* @param currentTimeMs The current time in milliseconds.
* @return A boolean indicating whether a refresh is required or not.
*/
public boolean hasMetadataExpired(long currentTimeMs) {
return currentTimeMs >= metadataRefreshDeadline.deadlineMs || groupEpoch() < metadataRefreshDeadline.epoch;
}
/**
* @return The metadata refresh deadline.
*/
public DeadlineAndEpoch metadataRefreshDeadline() {
return metadataRefreshDeadline;
}
/**
* Validates the OffsetCommit request.
*
* @param memberId The member id.
* @param groupInstanceId The group instance id.
* @param memberEpoch The member epoch.
* @param isTransactional Whether the offset commit is transactional or not. It has no
* impact when a consumer group is used.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int memberEpoch,
boolean isTransactional
) throws UnknownMemberIdException, StaleMemberEpochException {
// When the member epoch is -1, the request comes from either the admin client
// or a consumer which does not use the group management facility. In this case,
// the request can commit offsets if the group is empty.
if (memberEpoch < 0 && members().isEmpty()) return;
final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false);
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
/**
* Validates the OffsetFetch request.
*
* @param memberId The member id for consumer groups.
* @param memberEpoch The member epoch for consumer groups.
* @param lastCommittedOffset The last committed offsets in the timeline.
*/
@Override
public void validateOffsetFetch(
String memberId,
int memberEpoch,
long lastCommittedOffset
) throws UnknownMemberIdException, StaleMemberEpochException {
// When the member id is null and the member epoch is -1, the request either comes
// from the admin client or from a client which does not provide them. In this case,
// the fetch request is accepted.
if (memberId == null && memberEpoch < 0) return;
final ConsumerGroupMember member = members.get(memberId, lastCommittedOffset);
if (member == null) {
throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.",
memberId, groupId));
}
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
/**
* Validates the OffsetDelete request.
*/
@Override
public void validateOffsetDelete() {}
/**
* Validates the DeleteGroups request.
*/
@Override
public void validateDeleteGroup() throws ApiException {
if (state() != ConsumerGroupState.EMPTY) {
throw Errors.NON_EMPTY_GROUP.exception();
}
}
/**
* Populates the list of records with tombstone(s) for deleting the group.
*
* @param records The list of records.
*/
@Override
public void createGroupTombstoneRecords(List<Record> records) {
members().forEach((memberId, member) ->
records.add(RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId(), memberId))
);
members().forEach((memberId, member) ->
records.add(RecordHelpers.newTargetAssignmentTombstoneRecord(groupId(), memberId))
);
records.add(RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId()));
members().forEach((memberId, member) ->
records.add(RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId(), memberId))
);
records.add(RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId()));
records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId()));
}
@Override
public boolean isEmpty() {
return state() == ConsumerGroupState.EMPTY;
}
/**
* See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
*
* @return The offset expiration condition for the group or Empty if no such condition exists.
*/
@Override
public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs));
}
@Override
public boolean isInStates(Set<String> statesFilter, long committedOffset) {
return statesFilter.contains(state.get(committedOffset).toLowerCaseString());
}
/**
* Throws a StaleMemberEpochException if the received member epoch does not match
* the expected member epoch.
*/
private void validateMemberEpoch(
int receivedMemberEpoch,
int expectedMemberEpoch
) throws StaleMemberEpochException {
if (receivedMemberEpoch != expectedMemberEpoch) {
throw new StaleMemberEpochException(String.format("The received member epoch %d does not match "
+ "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
}
}
/**
* Updates the current state of the group.
*/
private void maybeUpdateGroupState() {
ConsumerGroupState previousState = state.get();
ConsumerGroupState newState = STABLE;
if (members.isEmpty()) {
newState = EMPTY;
} else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
newState = ASSIGNING;
} else {
for (ConsumerGroupMember member : members.values()) {
if (!member.isReconciledTo(targetAssignmentEpoch.get())) {
newState = RECONCILING;
break;
}
}
}
state.set(newState);
metrics.onConsumerGroupStateTransition(previousState, newState);
}
/**
* Updates the server assignors count.
*
* @param oldMember The old member.
* @param newMember The new member.
*/
private void maybeUpdateServerAssignors(
ConsumerGroupMember oldMember,
ConsumerGroupMember newMember
) {
maybeUpdateServerAssignors(serverAssignors, oldMember, newMember);
}
/**
* Updates the server assignors count.
*
* @param serverAssignorCount The count to update.
* @param oldMember The old member.
* @param newMember The new member.
*/
private static void maybeUpdateServerAssignors(
Map<String, Integer> serverAssignorCount,
ConsumerGroupMember oldMember,
ConsumerGroupMember newMember
) {
if (oldMember != null) {
oldMember.serverAssignorName().ifPresent(name ->
serverAssignorCount.compute(name, ConsumerGroup::decValue)
);
}
if (newMember != null) {
newMember.serverAssignorName().ifPresent(name ->
serverAssignorCount.compute(name, ConsumerGroup::incValue)
);
}
}
/**
* Updates the number of the members that use the classic protocol.
*
* @param oldMember The old member.
* @param newMember The new member.
*/
private void maybeUpdateNumClassicProtocolMembers(
ConsumerGroupMember oldMember,
ConsumerGroupMember newMember
) {
int delta = 0;
if (oldMember != null && oldMember.useClassicProtocol()) {
delta--;
}
if (newMember != null && newMember.useClassicProtocol()) {
delta++;
}
setNumClassicProtocolMembers(numClassicProtocolMembers() + delta);
}
/**
* Updates the supported protocol count of the members that use the classic protocol.
*
* @param oldMember The old member.
* @param newMember The new member.
*/
private void maybeUpdateClassicProtocolMembersSupportedProtocols(
ConsumerGroupMember oldMember,
ConsumerGroupMember newMember
) {
if (oldMember != null) {
oldMember.supportedClassicProtocols().ifPresent(protocols ->
protocols.forEach(protocol ->
classicProtocolMembersSupportedProtocols.compute(protocol.name(), ConsumerGroup::decValue)
)
);
}
if (newMember != null) {
newMember.supportedClassicProtocols().ifPresent(protocols ->
protocols.forEach(protocol ->
classicProtocolMembersSupportedProtocols.compute(protocol.name(), ConsumerGroup::incValue)
)
);
}
}
/**
* Updates the subscribed topic names count.
*
* @param oldMember The old member.
* @param newMember The new member.
*/
private void maybeUpdateSubscribedTopicNames(
ConsumerGroupMember oldMember,
ConsumerGroupMember newMember
) {
maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember);
}
/**
* Updates the subscription count.
*
* @param subscribedTopicCount The map to update.
* @param oldMember The old member.
* @param newMember The new member.
*/
private static void maybeUpdateSubscribedTopicNames(
Map<String, Integer> subscribedTopicCount,
ConsumerGroupMember oldMember,
ConsumerGroupMember newMember
) {
if (oldMember != null) {
oldMember.subscribedTopicNames().forEach(topicName ->
subscribedTopicCount.compute(topicName, ConsumerGroup::decValue)
);
}
if (newMember != null) {
newMember.subscribedTopicNames().forEach(topicName ->
subscribedTopicCount.compute(topicName, ConsumerGroup::incValue)
);
}
}
/**
* Updates the partition epochs based on the old and the new member.
*
* @param oldMember The old member.
* @param newMember The new member.
*/
private void maybeUpdatePartitionEpoch(
ConsumerGroupMember oldMember,
ConsumerGroupMember newMember
) {
maybeRemovePartitionEpoch(oldMember);
addPartitionEpochs(newMember.assignedPartitions(), newMember.memberEpoch());
addPartitionEpochs(newMember.partitionsPendingRevocation(), newMember.memberEpoch());
}
/**
* Removes the partition epochs for the provided member.
*
* @param oldMember The old member.
*/
private void maybeRemovePartitionEpoch(
ConsumerGroupMember oldMember
) {
if (oldMember != null) {
removePartitionEpochs(oldMember.assignedPartitions(), oldMember.memberEpoch());
removePartitionEpochs(oldMember.partitionsPendingRevocation(), oldMember.memberEpoch());
}
}
/**
* Removes the partition epochs based on the provided assignment.
*
* @param assignment The assignment.
* @param expectedEpoch The expected epoch.
* @throws IllegalStateException if the epoch does not match the expected one.
* package-private for testing.
*/
void removePartitionEpochs(
Map<Uuid, Set<Integer>> assignment,
int expectedEpoch
) {
assignment.forEach((topicId, assignedPartitions) -> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
Integer prevValue = partitionsOrNull.remove(partitionId);
if (prevValue != expectedEpoch) {
throw new IllegalStateException(
String.format("Cannot remove the epoch %d from %s-%s because the partition is " +
"still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue));
}
});
if (partitionsOrNull.isEmpty()) {
return null;
} else {
return partitionsOrNull;
}
} else {
throw new IllegalStateException(
String.format("Cannot remove the epoch %d from %s because it does not have any epoch",
expectedEpoch, topicId));
}
});
});
}
/**
* Adds the partitions epoch based on the provided assignment.
*
* @param assignment The assignment.
* @param epoch The new epoch.
* @throws IllegalStateException if the partition already has an epoch assigned.
* package-private for testing.
*/
void addPartitionEpochs(
Map<Uuid, Set<Integer>> assignment,
int epoch
) {
assignment.forEach((topicId, assignedPartitions) -> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull == null) {
partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedPartitions.size());
}
for (Integer partitionId : assignedPartitions) {
Integer prevValue = partitionsOrNull.put(partitionId, epoch);
if (prevValue != null) {
throw new IllegalStateException(
String.format("Cannot set the epoch of %s-%s to %d because the partition is " +
"still owned at epoch %d", topicId, partitionId, epoch, prevValue));
}
}
return partitionsOrNull;
});
});
}
/**
* Decrements value by 1; returns null when reaching zero. This helper is
* meant to be used with Map#compute.
*/
private static Integer decValue(String key, Integer value) {
if (value == null) return null;
return value == 1 ? null : value - 1;
}
/**
* Increments value by 1; This helper is meant to be used with Map#compute.
*/
private static Integer incValue(String key, Integer value) {
return value == null ? 1 : value + 1;
}
public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup(
long committedOffset,
String defaultAssignor,
TopicsImage topicsImage
) {
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setAssignorName(preferredServerAssignor(committedOffset).orElse(defaultAssignor))
.setGroupEpoch(groupEpoch.get(committedOffset))
.setGroupState(state.get(committedOffset).toString())
.setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset));
members.entrySet(committedOffset).forEach(
entry -> describedGroup.members().add(
entry.getValue().asConsumerGroupDescribeMember(
targetAssignment.get(entry.getValue().memberId(), committedOffset),
topicsImage
)
)
);
return describedGroup;
}
/**
* Create a new consumer group according to the given classic group.
*
* @param snapshotRegistry The SnapshotRegistry.
* @param metrics The GroupCoordinatorMetricsShard.
* @param classicGroup The converted classic group.
* @param topicsImage The TopicsImage for topic id and topic name conversion.
* @return The created ConsumerGruop.
*/
public static ConsumerGroup fromClassicGroup(
SnapshotRegistry snapshotRegistry,
GroupCoordinatorMetricsShard metrics,
ClassicGroup classicGroup,
TopicsImage topicsImage
) {
String groupId = classicGroup.groupId();
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
consumerGroup.setGroupEpoch(classicGroup.generationId());
consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
classicGroup.allMembers().forEach(classicGroupMember -> {
ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(
ByteBuffer.wrap(classicGroupMember.assignment())
);
Map<Uuid, Set<Integer>> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage);
ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
);
// The target assignment and the assigned partitions of each member are set based on the last
// assignment of the classic group. All the members are put in the Stable state. If the classic
// group was in Preparing Rebalance or Completing Rebalance states, the classic members are
// asked to rejoin the group to re-trigger a rebalance or collect their assignments.
ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(classicGroupMember.memberId())
.setMemberEpoch(classicGroup.generationId())
.setState(MemberState.STABLE)
.setPreviousMemberEpoch(classicGroup.generationId())
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
.setRackId(subscription.rackId().orElse(null))
.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
.setClientId(classicGroupMember.clientId())
.setClientHost(classicGroupMember.clientHost())
.setSubscribedTopicNames(subscription.topics())
.setAssignedPartitions(partitions)
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
.build();
consumerGroup.updateTargetAssignment(newMember.memberId(), new Assignment(partitions));
consumerGroup.updateMember(newMember);
});
return consumerGroup;
}
/**
* Populate the record list with the records needed to create the given consumer group.
*
* @param records The list to which the new records are added.
*/
public void createConsumerGroupRecords(
List<Record> records
) {
members().forEach((__, consumerGroupMember) ->
records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), consumerGroupMember))
);
records.add(RecordHelpers.newGroupEpochRecord(groupId(), groupEpoch()));
members().forEach((consumerGroupMemberId, consumerGroupMember) ->
records.add(RecordHelpers.newTargetAssignmentRecord(
groupId(),
consumerGroupMemberId,
targetAssignment(consumerGroupMemberId).partitions()
))
);
records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), groupEpoch()));
members().forEach((__, consumerGroupMember) ->
records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), consumerGroupMember))
);
}
/**
* @return The map of topic id and partition set converted from the list of TopicPartition.
*/
private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
List<TopicPartition> partitions,
TopicsImage topicsImage
) {
Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
partitions.forEach(topicPartition -> {
TopicImage topicImage = topicsImage.getTopic(topicPartition.topic());
if (topicImage != null) {
topicPartitionMap
.computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
.add(topicPartition.partition());
}
});
return topicPartitionMap;
}
/**
* Checks whether at least one of the given protocols can be supported. A
* protocol can be supported if it is supported by all members that use the
* classic protocol.
*
* @param memberProtocolType The member protocol type.
* @param memberProtocols The set of protocol names.
*
* @return A boolean based on the condition mentioned above.
*/
public boolean supportsClassicProtocols(String memberProtocolType, Set<String> memberProtocols) {
if (isEmpty()) {
return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty();
} else {
return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) &&
memberProtocols.stream().anyMatch(
name -> classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) == numClassicProtocolMembers()
);
}
}
/**
* Checks whether all the members use the classic protocol except the given member.
*
* @param memberId The member to remove.
* @return A boolean indicating whether all the members use the classic protocol.
*/
public boolean allMembersUseClassicProtocolExcept(String memberId) {
return numClassicProtocolMembers() == members().size() - 1 &&
!getOrMaybeCreateMember(memberId, false).useClassicProtocol();
}
}