blob: 511db01c86219e16f13e07b708a734c9555fc0fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.consumer.TopicIds;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicsImage;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class TargetAssignmentBuilderBenchmark {
@Param({"100", "500", "1000", "5000", "10000"})
private int memberCount;
@Param({"5", "10", "50"})
private int partitionsToMemberRatio;
@Param({"10", "100", "1000"})
private int topicCount;
private static final String GROUP_ID = "benchmark-group";
private static final int GROUP_EPOCH = 0;
private PartitionAssignor partitionAssignor;
private Map<String, TopicMetadata> subscriptionMetadata = Collections.emptyMap();
private TargetAssignmentBuilder targetAssignmentBuilder;
private GroupSpecImpl groupSpec;
private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
private final List<String> allTopicNames = new ArrayList<>();
private TopicsImage topicsImage;
@Setup(Level.Trial)
public void setup() {
// For this benchmark we will use the Uniform Assignor
// and a group that has a homogeneous subscription model.
partitionAssignor = new UniformAssignor();
subscriptionMetadata = generateMockSubscriptionMetadata();
Map<String, ConsumerGroupMember> members = generateMockMembers();
Map<String, Assignment> existingTargetAssignment = generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment();
ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("newMember")
.setSubscribedTopicNames(allTopicNames)
.build();
targetAssignmentBuilder = new TargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH, partitionAssignor)
.withMembers(members)
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(HOMOGENEOUS)
.withTargetAssignment(existingTargetAssignment)
.withInvertedTargetAssignment(invertedTargetAssignment)
.withTopicsImage(topicsImage)
.addOrUpdateMember(newMember.memberId(), newMember);
}
private Map<String, ConsumerGroupMember> generateMockMembers() {
Map<String, ConsumerGroupMember> members = new HashMap<>();
for (int i = 0; i < memberCount - 1; i++) {
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member" + i)
.setSubscribedTopicNames(allTopicNames)
.build();
members.put("member" + i, member);
}
return members;
}
private Map<String, TopicMetadata> generateMockSubscriptionMetadata() {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount;
for (int i = 0; i < topicCount; i++) {
String topicName = "topic-" + i;
Uuid topicId = Uuid.randomUuid();
allTopicNames.add(topicName);
TopicMetadata metadata = new TopicMetadata(
topicId,
topicName,
partitionsPerTopicCount,
Collections.emptyMap()
);
subscriptionMetadata.put(topicName, metadata);
AssignorBenchmarkUtils.addTopic(
delta,
topicId,
topicName,
partitionsPerTopicCount
);
}
topicsImage = delta.apply(MetadataProvenance.EMPTY).topics();
return subscriptionMetadata;
}
private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment() {
Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>(topicCount);
subscriptionMetadata.forEach((topicName, topicMetadata) ->
topicMetadataMap.put(
topicMetadata.id(),
topicMetadata
)
);
createAssignmentSpec();
GroupAssignment groupAssignment = partitionAssignor.assign(
groupSpec,
new SubscribedTopicMetadata(topicMetadataMap)
);
invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(groupAssignment);
Map<String, Assignment> initialTargetAssignment = new HashMap<>(memberCount);
for (Map.Entry<String, MemberAssignment> entry : groupAssignment.members().entrySet()) {
String memberId = entry.getKey();
Map<Uuid, Set<Integer>> topicPartitions = entry.getValue().targetPartitions();
Assignment assignment = new Assignment(topicPartitions);
initialTargetAssignment.put(memberId, assignment);
}
return initialTargetAssignment;
}
private void createAssignmentSpec() {
Map<String, AssignmentMemberSpec> members = new HashMap<>();
for (int i = 0; i < memberCount - 1; i++) {
String memberId = "member" + i;
members.put(memberId, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
new TopicIds(new HashSet<>(allTopicNames), topicsImage),
Collections.emptyMap()
));
}
groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap());
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void build() {
targetAssignmentBuilder.build();
}
}