blob: f21bd63735fb1b231fefc32e241ab36906116771 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class OptimizedUniformAssignmentBuilderTest {
private final UniformAssignor assignor = new UniformAssignor();
private final Uuid topic1Uuid = Uuid.fromString("T1-A4s3VTwiI5CTbEp6POw");
private final Uuid topic2Uuid = Uuid.fromString("T2-B4s3VTwiI5YHbPp6YUe");
private final Uuid topic3Uuid = Uuid.fromString("T3-CU8fVTLCz5YMkLoDQsa");
private final String topic1Name = "topic1";
private final String topic2Name = "topic2";
private final String topic3Name = "topic3";
private final String memberA = "A";
private final String memberB = "B";
private final String memberC = "C";
@Test
public void testOneMemberNoTopicSubscription() {
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(
Collections.singletonMap(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
)
)
);
Map<String, AssignmentMemberSpec> members = Collections.singletonMap(
memberA,
new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.emptySet(),
Collections.emptyMap()
)
);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
GroupAssignment groupAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertEquals(Collections.emptyMap(), groupAssignment.members());
}
@Test
public void testOneMemberSubscribedToNonexistentTopic() {
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(
Collections.singletonMap(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
)
)
);
Map<String, AssignmentMemberSpec> members = Collections.singletonMap(
memberA,
new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic2Uuid),
Collections.emptyMap()
)
);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
assertThrows(PartitionAssignorException.class,
() -> assignor.assign(groupSpec, subscribedTopicMetadata));
}
@Test
public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
mkMapOfPartitionRacks(2)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Collections.emptyMap()
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Collections.emptyMap()
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic3Uuid, 1)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic3Uuid, 0)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@Test
public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
mkMapOfPartitionRacks(2)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic3Uuid),
Collections.emptyMap()
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic3Uuid),
Collections.emptyMap()
));
members.put(memberC, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic3Uuid),
Collections.emptyMap()
));
// Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic3Uuid, 0)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic3Uuid, 1)
));
expectedAssignment.put(memberC,
Collections.emptyMap()
);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@Test
public void testValidityAndBalanceForLargeSampleSet() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
for (int i = 1; i < 100; i++) {
Uuid topicId = Uuid.randomUuid();
topicMetadata.put(topicId, new TopicMetadata(
topicId,
"topic-" + i,
3,
mkMapOfPartitionRacks(3)
));
}
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
for (int i = 1; i < 50; i++) {
members.put("member" + i, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
topicMetadata.keySet(),
Collections.emptyMap()
));
}
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
checkValidityAndBalance(members, computedAssignment);
}
@Test
public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
mkMapOfPartitionRacks(3)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
)
);
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
));
Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 1, 2)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@Test
public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() {
// Simulating adding partition to T1 and T2 - originally T1 -> 3 Partitions and T2 -> 3 Partitions
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
6,
mkMapOfPartitionRacks(6)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
5,
mkMapOfPartitionRacks(5)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
)
);
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
));
Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2, 3, 5),
mkTopicAssignment(topic2Uuid, 0, 4)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1, 4),
mkTopicAssignment(topic2Uuid, 1, 2, 3)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@Test
public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembersTwoTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
mkMapOfPartitionRacks(3)
));
Map<String, AssignmentMemberSpec> members = new HashMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
));
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
));
Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
));
// Add a new member to trigger a re-assignment.
members.put(memberC, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
Collections.emptyMap()
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
));
expectedAssignment.put(memberC, mkAssignment(
mkTopicAssignment(topic2Uuid, 0, 2)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@Test
public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeMembersTwoTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
mkMapOfPartitionRacks(3)
));
Map<String, AssignmentMemberSpec> members = new HashMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
);
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
));
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
));
// Member C was removed
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@Test
public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWithTwoMembersTwoTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
2,
mkMapOfPartitionRacks(2)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
2,
mkMapOfPartitionRacks(2)
));
// Initial subscriptions were [T1, T2]
Map<String, AssignmentMemberSpec> members = new HashMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
);
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic2Uuid),
currentAssignmentForA
));
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic2Uuid),
currentAssignmentForB
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic2Uuid, 0)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic2Uuid, 1)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
/**
* Verifies that the given assignment is valid with respect to the given subscriptions.
* Validity requirements:
* - each member is subscribed to topics of all partitions assigned to it, and
* - each partition is assigned to no more than one member.
* Balance requirements:
* - the assignment is fully balanced (the numbers of topic partitions assigned to members differ by at most one), or
* - there is no topic partition that can be moved from one member to another with 2+ fewer topic partitions.
*
* @param members Members data structure from the assignment Spec.
* @param computedGroupAssignment Assignment computed by the uniform assignor.
*/
private void checkValidityAndBalance(
Map<String, AssignmentMemberSpec> members,
GroupAssignment computedGroupAssignment
) {
List<String> membersList = new ArrayList<>(computedGroupAssignment.members().keySet());
int numMembers = membersList.size();
List<Integer> totalAssignmentSizesOfAllMembers = new ArrayList<>(membersList.size());
membersList.forEach(member -> {
Map<Uuid, Set<Integer>> computedAssignmentForMember = computedGroupAssignment
.members().get(member).targetPartitions();
int sum = computedAssignmentForMember.values().stream().mapToInt(Set::size).sum();
totalAssignmentSizesOfAllMembers.add(sum);
});
for (int i = 0; i < numMembers; i++) {
String memberId = membersList.get(i);
Map<Uuid, Set<Integer>> computedAssignmentForMember =
computedGroupAssignment.members().get(memberId).targetPartitions();
// Each member is subscribed to topics of all the partitions assigned to it.
computedAssignmentForMember.keySet().forEach(topicId -> {
// Check if the topic exists in the subscription.
assertTrue(members.get(memberId).subscribedTopicIds().contains(topicId),
"Error: Partitions for topic " + topicId + " are assigned to member " + memberId +
" but it is not part of the members subscription ");
});
for (int j = i + 1; j < numMembers; j++) {
String otherMemberId = membersList.get(j);
Map<Uuid, Set<Integer>> computedAssignmentForOtherMember = computedGroupAssignment
.members().get(otherMemberId).targetPartitions();
// Each partition should be assigned to at most one member
computedAssignmentForMember.keySet().forEach(topicId -> {
Set<Integer> intersection = new HashSet<>();
if (computedAssignmentForOtherMember.containsKey(topicId)) {
intersection = new HashSet<>(computedAssignmentForMember.get(topicId));
intersection.retainAll(computedAssignmentForOtherMember.get(topicId));
}
assertTrue(
intersection.isEmpty(),
"Error : Member 1 " + memberId + " and Member 2 " + otherMemberId +
"have common partitions assigned to them " + computedAssignmentForOtherMember.get(topicId)
);
});
// Difference in the sizes of any two partitions should be 1 at max
int size1 = totalAssignmentSizesOfAllMembers.get(i);
int size2 = totalAssignmentSizesOfAllMembers.get(j);
assertTrue(
Math.abs(size1 - size2) <= 1,
"Size of one assignment is greater than the other assignment by more than one partition "
+ size1 + " " + size2 + "abs = " + Math.abs(size1 - size2)
);
}
}
}
}