| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.coordinator.group.assignor; |
| |
| import org.apache.kafka.common.Uuid; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import static java.lang.Math.min; |
| import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; |
| |
| /** |
| * This Range Assignor inherits properties of both the range assignor and the sticky assignor. |
| * The properties are as follows: |
| * <ol> |
| * <li> Each member must get at least one partition from every topic that it is subscribed to. |
| * The only exception is when the number of subscribed members is greater than the |
| * number of partitions for that topic. (Range) </li> |
| * <li> Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) |
| * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. |
| * Two streams are co-partitioned if the following conditions are met: |
| * <ul> |
| * <li> The keys must have the same schemas. </li> |
| * <li> The topics involved must have the same number of partitions. </li> |
| * </ul> |
| * </li> |
| * <li> Members should retain as much of their previous assignment as possible to reduce the number of partition |
| * movements during reassignment. (Sticky) </li> |
| * </ol> |
| */ |
| public class RangeAssignor implements PartitionAssignor { |
| public static final String RANGE_ASSIGNOR_NAME = "range"; |
| |
| @Override |
| public String name() { |
| return RANGE_ASSIGNOR_NAME; |
| } |
| |
| /** |
| * Pair of memberId and remaining partitions to meet the quota. |
| */ |
| private static class MemberWithRemainingAssignments { |
| /** |
| * Member Id. |
| */ |
| private final String memberId; |
| |
| /** |
| * Number of partitions required to meet the assignment quota. |
| */ |
| private final int remaining; |
| |
| public MemberWithRemainingAssignments(String memberId, int remaining) { |
| this.memberId = memberId; |
| this.remaining = remaining; |
| } |
| } |
| |
| /** |
| * Returns a map of topic Ids to a list of members subscribed to them, |
| * based on the given assignment specification and metadata. |
| * |
| * @param assignmentSpec The specification for member assignments. |
| * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. |
| * @return A map of topic Ids to a list of member Ids subscribed to them. |
| * |
| * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. |
| */ |
| private Map<Uuid, Collection<String>> membersPerTopic( |
| final AssignmentSpec assignmentSpec, |
| final SubscribedTopicDescriber subscribedTopicDescriber |
| ) { |
| Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>(); |
| Map<String, AssignmentMemberSpec> membersData = assignmentSpec.members(); |
| |
| if (assignmentSpec.subscriptionType().equals(HOMOGENEOUS)) { |
| Set<String> allMembers = membersData.keySet(); |
| Collection<Uuid> topics = membersData.values().iterator().next().subscribedTopicIds(); |
| |
| for (Uuid topicId : topics) { |
| if (subscribedTopicDescriber.numPartitions(topicId) == -1) { |
| throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); |
| } |
| membersPerTopic.put(topicId, allMembers); |
| } |
| } else { |
| membersData.forEach((memberId, memberMetadata) -> { |
| Collection<Uuid> topics = memberMetadata.subscribedTopicIds(); |
| for (Uuid topicId : topics) { |
| if (subscribedTopicDescriber.numPartitions(topicId) == -1) { |
| throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); |
| } |
| membersPerTopic |
| .computeIfAbsent(topicId, k -> new ArrayList<>()) |
| .add(memberId); |
| } |
| }); |
| } |
| |
| return membersPerTopic; |
| } |
| |
| /** |
| * The algorithm includes the following steps: |
| * <ol> |
| * <li> Generate a map of members per topic using the given member subscriptions. </li> |
| * <li> Generate a list of members called potentially unfilled members, which consists of members that have not |
| * met the minimum required quota of partitions for the assignment AND get a list called assigned sticky |
| * partitions for topic, which has the partitions that will be retained in the new assignment. </li> |
| * <li> Generate a list of unassigned partitions by calculating the difference between the total partitions |
| * for the topic and the assigned (sticky) partitions. </li> |
| * <li> Find members from the potentially unfilled members list that haven't met the total required quota |
| * i.e. minRequiredQuota + 1, if the member is designated to receive one of the excess partitions OR |
| * minRequiredQuota otherwise. </li> |
| * <li> Assign partitions to them in ranges from the unassigned partitions per topic |
| * based on the remaining partitions value. </li> |
| * </ol> |
| */ |
| @Override |
| public GroupAssignment assign( |
| final AssignmentSpec assignmentSpec, |
| final SubscribedTopicDescriber subscribedTopicDescriber |
| ) throws PartitionAssignorException { |
| |
| Map<String, MemberAssignment> newTargetAssignment = new HashMap<>(); |
| |
| // Step 1 |
| Map<Uuid, Collection<String>> membersPerTopic = membersPerTopic( |
| assignmentSpec, |
| subscribedTopicDescriber |
| ); |
| |
| membersPerTopic.forEach((topicId, membersForTopic) -> { |
| int numPartitionsForTopic = subscribedTopicDescriber.numPartitions(topicId); |
| int minRequiredQuota = numPartitionsForTopic / membersForTopic.size(); |
| // Each member can get only ONE extra partition per topic after receiving the minimum quota. |
| int numMembersWithExtraPartition = numPartitionsForTopic % membersForTopic.size(); |
| |
| // Step 2 |
| Set<Integer> assignedStickyPartitionsForTopic = new HashSet<>(); |
| List<MemberWithRemainingAssignments> potentiallyUnfilledMembers = new ArrayList<>(); |
| |
| for (String memberId : membersForTopic) { |
| Set<Integer> assignedPartitionsForTopic = assignmentSpec.members().get(memberId) |
| .assignedPartitions().getOrDefault(topicId, Collections.emptySet()); |
| |
| int currentAssignmentSize = assignedPartitionsForTopic.size(); |
| List<Integer> currentAssignmentListForTopic = new ArrayList<>(assignedPartitionsForTopic); |
| |
| // If there were partitions from this topic that were previously assigned to this member, retain as many as possible. |
| // Sort the current assignment in ascending order since we want the same partition numbers from each topic |
| // to go to the same member, in order to facilitate joins in case of co-partitioned topics. |
| if (currentAssignmentSize > 0) { |
| int retainedPartitionsCount = min(currentAssignmentSize, minRequiredQuota); |
| Collections.sort(currentAssignmentListForTopic); |
| for (int i = 0; i < retainedPartitionsCount; i++) { |
| assignedStickyPartitionsForTopic |
| .add(currentAssignmentListForTopic.get(i)); |
| newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<>())) |
| .targetPartitions() |
| .computeIfAbsent(topicId, k -> new HashSet<>()) |
| .add(currentAssignmentListForTopic.get(i)); |
| } |
| } |
| |
| // Number of partitions required to meet the minRequiredQuota. |
| // There are 3 cases w.r.t the value of remaining: |
| // 1) remaining < 0: this means that the member has more than the min required amount. |
| // 2) If remaining = 0: member has the minimum required partitions, but it may get an extra partition, so it is a potentially unfilled member. |
| // 3) If remaining > 0: member doesn't have the minimum required partitions, so it should be added to potentiallyUnfilledMembers. |
| int remaining = minRequiredQuota - currentAssignmentSize; |
| |
| // Retain extra partitions as well when applicable. |
| if (remaining < 0 && numMembersWithExtraPartition > 0) { |
| numMembersWithExtraPartition--; |
| // Since we already added the minimumRequiredQuota of partitions in the previous step (until minReq - 1), we just need to |
| // add the extra partition that will be present at the index right after min quota was satisfied. |
| assignedStickyPartitionsForTopic |
| .add(currentAssignmentListForTopic.get(minRequiredQuota)); |
| newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<>())) |
| .targetPartitions() |
| .computeIfAbsent(topicId, k -> new HashSet<>()) |
| .add(currentAssignmentListForTopic.get(minRequiredQuota)); |
| } else { |
| MemberWithRemainingAssignments newPair = new MemberWithRemainingAssignments(memberId, remaining); |
| potentiallyUnfilledMembers.add(newPair); |
| } |
| } |
| |
| // Step 3 |
| // Find the difference between the total partitions per topic and the already assigned sticky partitions for the topic to get the unassigned partitions. |
| // List of unassigned partitions for topic contains the partitions in ascending order. |
| List<Integer> unassignedPartitionsForTopic = new ArrayList<>(); |
| for (int i = 0; i < numPartitionsForTopic; i++) { |
| if (!assignedStickyPartitionsForTopic.contains(i)) { |
| unassignedPartitionsForTopic.add(i); |
| } |
| } |
| |
| // Step 4 and Step 5 |
| // Account for the extra partitions if necessary and increase the required quota by 1. |
| // If remaining > 0 after increasing the required quota, assign the remaining number of partitions from the unassigned partitions list. |
| int unassignedPartitionsListStartPointer = 0; |
| for (MemberWithRemainingAssignments pair : potentiallyUnfilledMembers) { |
| String memberId = pair.memberId; |
| int remaining = pair.remaining; |
| if (numMembersWithExtraPartition > 0) { |
| remaining++; |
| numMembersWithExtraPartition--; |
| } |
| if (remaining > 0) { |
| List<Integer> partitionsToAssign = unassignedPartitionsForTopic |
| .subList(unassignedPartitionsListStartPointer, unassignedPartitionsListStartPointer + remaining); |
| unassignedPartitionsListStartPointer += remaining; |
| newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<>())) |
| .targetPartitions() |
| .computeIfAbsent(topicId, k -> new HashSet<>()) |
| .addAll(partitionsToAssign); |
| } |
| } |
| }); |
| |
| return new GroupAssignment(newTargetAssignment); |
| } |
| } |
| |