blob: fcc5605d40f98cbbd2eae3dad833dc9051c705b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaIO.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An {@link UnboundedSource} to read from Kafka, used by {@link Read} transform in KafkaIO. See
* {@link KafkaIO} for user visible documentation and example usage.
*/
class KafkaUnboundedSource<K, V> extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
/**
* The partitions are evenly distributed among the splits. The number of splits returned is {@code
* min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact count.
*
* <p>It is important to assign the partitions deterministically so that we can support resuming a
* split from last checkpoint. The Kafka partitions are sorted by {@code <topic, partition>} and
* then assigned to splits in round-robin order.
*/
@Override
public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOptions options)
throws Exception {
List<TopicPartition> partitions = new ArrayList<>(spec.getTopicPartitions());
// (a) fetch partitions for each topic
// (b) sort by <topic, partition>
// (c) round-robin assign the partitions to splits
if (partitions.isEmpty()) {
try (Consumer<?, ?> consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
for (String topic : spec.getTopics()) {
for (PartitionInfo p : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
}
}
}
partitions.sort(
Comparator.comparing(TopicPartition::topic)
.thenComparing(Comparator.comparingInt(TopicPartition::partition)));
checkArgument(desiredNumSplits > 0);
checkState(
partitions.size() > 0,
"Could not find any partitions. Please check Kafka configuration and topic names");
int numSplits = Math.min(desiredNumSplits, partitions.size());
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
for (int i = 0; i < numSplits; i++) {
assignments.add(new ArrayList<>());
}
for (int i = 0; i < partitions.size(); i++) {
assignments.get(i % numSplits).add(partitions.get(i));
}
List<KafkaUnboundedSource<K, V>> result = new ArrayList<>(numSplits);
for (int i = 0; i < numSplits; i++) {
List<TopicPartition> assignedToSplit = assignments.get(i);
LOG.info(
"Partitions assigned to split {} (total {}): {}",
i,
assignedToSplit.size(),
Joiner.on(",").join(assignedToSplit));
result.add(
new KafkaUnboundedSource<>(
spec.toBuilder()
.setTopics(Collections.emptyList())
.setTopicPartitions(assignedToSplit)
.build(),
i));
}
return result;
}
@Override
public KafkaUnboundedReader<K, V> createReader(
PipelineOptions options, KafkaCheckpointMark checkpointMark) {
if (spec.getTopicPartitions().isEmpty()) {
LOG.warn("Looks like generateSplits() is not called. Generate single split.");
try {
return new KafkaUnboundedReader<>(split(1, options).get(0), checkpointMark);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return new KafkaUnboundedReader<>(this, checkpointMark);
}
@Override
public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
return AvroCoder.of(KafkaCheckpointMark.class);
}
@Override
public boolean requiresDeduping() {
// Kafka records are ordered with in partitions. In addition checkpoint guarantees
// records are not consumed twice.
return false;
}
@Override
public Coder<KafkaRecord<K, V>> getOutputCoder() {
return KafkaRecordCoder.of(spec.getKeyCoder(), spec.getValueCoder());
}
/////////////////////////////////////////////////////////////////////////////////////////////
private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedSource.class);
private final Read<K, V> spec; // Contains all the relevant configuratiton of the source.
private final int id; // split id, mainly for debugging
public KafkaUnboundedSource(Read<K, V> spec, int id) {
this.spec = spec;
this.id = id;
}
Read<K, V> getSpec() {
return spec;
}
int getId() {
return id;
}
}