blob: 405ecd590a3542a41e508c900ddffff06d105a6d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.StreamsException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Default implementation of the {@link PartitionGrouper} interface that groups partitions by the partition id.
*
* Join operations requires that topics of the joining entities are copartitoned, i.e., being partitioned by the same key and having the same
* number of partitions. Copartitioning is ensured by having the same number of partitions on
* joined topics, and by using the serialization and Producer's default partitioner.
*/
public class DefaultPartitionGrouper implements PartitionGrouper {
/**
* Generate tasks with the assigned topic partitions.
*
* @param topicGroups group of topics that need to be joined together
* @param metadata metadata of the consuming cluster
* @return The map from generated task ids to the assigned partitions
*/
public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata) {
Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();
for (Map.Entry<Integer, Set<String>> entry : topicGroups.entrySet()) {
Integer topicGroupId = entry.getKey();
Set<String> topicGroup = entry.getValue();
int maxNumPartitions = maxNumPartitions(metadata, topicGroup);
for (int partitionId = 0; partitionId < maxNumPartitions; partitionId++) {
Set<TopicPartition> group = new HashSet<>(topicGroup.size());
for (String topic : topicGroup) {
if (partitionId < metadata.partitionsForTopic(topic).size()) {
group.add(new TopicPartition(topic, partitionId));
}
}
groups.put(new TaskId(topicGroupId, partitionId), Collections.unmodifiableSet(group));
}
}
return Collections.unmodifiableMap(groups);
}
/**
* @throws StreamsException if no metadata can be received for a topic
*/
protected int maxNumPartitions(Cluster metadata, Set<String> topics) {
int maxNumPartitions = 0;
for (String topic : topics) {
List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
if (infos == null)
throw new StreamsException("Topic not found during partition assignment: " + topic);
int numPartitions = infos.size();
if (numPartitions > maxNumPartitions)
maxNumPartitions = numPartitions;
}
return maxNumPartitions;
}
}