| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.utilities.sources.helpers; |
| |
| import org.apache.hudi.DataSourceUtils; |
| import org.apache.hudi.common.config.TypedProperties; |
| import org.apache.hudi.common.util.Option; |
| import org.apache.hudi.exception.HoodieException; |
| import org.apache.hudi.exception.HoodieNotSupportedException; |
| |
| import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; |
| import org.apache.kafka.clients.consumer.KafkaConsumer; |
| import org.apache.kafka.common.PartitionInfo; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.log4j.LogManager; |
| import org.apache.log4j.Logger; |
| import org.apache.spark.streaming.kafka010.OffsetRange; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Source to read data from Kafka, incrementally. |
| */ |
| public class KafkaOffsetGen { |
| |
| private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class); |
| |
| public static class CheckpointUtils { |
| |
| /** |
| * Reconstruct checkpoint from timeline. |
| */ |
| public static HashMap<TopicPartition, Long> strToOffsets(String checkpointStr) { |
| HashMap<TopicPartition, Long> offsetMap = new HashMap<>(); |
| String[] splits = checkpointStr.split(","); |
| String topic = splits[0]; |
| for (int i = 1; i < splits.length; i++) { |
| String[] subSplits = splits[i].split(":"); |
| offsetMap.put(new TopicPartition(topic, Integer.parseInt(subSplits[0])), Long.parseLong(subSplits[1])); |
| } |
| return offsetMap; |
| } |
| |
| /** |
| * String representation of checkpoint |
| * <p> |
| * Format: topic1,0:offset0,1:offset1,2:offset2, ..... |
| */ |
| public static String offsetsToStr(OffsetRange[] ranges) { |
| StringBuilder sb = new StringBuilder(); |
| // at least 1 partition will be present. |
| sb.append(ranges[0].topic() + ","); |
| sb.append(Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset())) |
| .collect(Collectors.joining(","))); |
| return sb.toString(); |
| } |
| |
| /** |
| * Compute the offset ranges to read from Kafka, while handling newly added partitions, skews, event limits. |
| * |
| * @param fromOffsetMap offsets where we left off last time |
| * @param toOffsetMap offsets of where each partitions is currently at |
| * @param numEvents maximum number of events to read. |
| */ |
| public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOffsetMap, |
| Map<TopicPartition, Long> toOffsetMap, long numEvents) { |
| |
| Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition); |
| |
| // Create initial offset ranges for each 'to' partition, with from = to offsets. |
| OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()]; |
| toOffsetMap.keySet().stream().map(tp -> { |
| long fromOffset = fromOffsetMap.getOrDefault(tp, 0L); |
| return OffsetRange.create(tp, fromOffset, fromOffset); |
| }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges); |
| |
| long allocedEvents = 0; |
| Set<Integer> exhaustedPartitions = new HashSet<>(); |
| // keep going until we have events to allocate and partitions still not exhausted. |
| while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) { |
| long remainingEvents = numEvents - allocedEvents; |
| long eventsPerPartition = |
| (long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size())); |
| |
| // Allocate the remaining events to non-exhausted partitions, in round robin fashion |
| for (int i = 0; i < ranges.length; i++) { |
| OffsetRange range = ranges[i]; |
| if (!exhaustedPartitions.contains(range.partition())) { |
| long toOffsetMax = toOffsetMap.get(range.topicPartition()); |
| long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition); |
| if (toOffset == toOffsetMax) { |
| exhaustedPartitions.add(range.partition()); |
| } |
| allocedEvents += toOffset - range.untilOffset(); |
| // We need recompute toOffset if allocedEvents larger than numEvents. |
| if (allocedEvents > numEvents) { |
| long offsetsToAdd = Math.min(eventsPerPartition, (numEvents - allocedEvents)); |
| toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd); |
| } |
| ranges[i] = OffsetRange.create(range.topicPartition(), range.fromOffset(), toOffset); |
| } |
| } |
| } |
| |
| return ranges; |
| } |
| |
| public static long totalNewMessages(OffsetRange[] ranges) { |
| return Arrays.stream(ranges).mapToLong(OffsetRange::count).sum(); |
| } |
| } |
| |
| /** |
| * Kafka reset offset strategies. |
| */ |
| enum KafkaResetOffsetStrategies { |
| LATEST, EARLIEST |
| } |
| |
| /** |
| * Configs to be passed for this source. All standard Kafka consumer configs are also respected |
| */ |
| public static class Config { |
| |
| private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; |
| private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents"; |
| private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST; |
| public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000; |
| public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; |
| } |
| |
| private final HashMap<String, Object> kafkaParams; |
| private final TypedProperties props; |
| protected final String topicName; |
| |
| public KafkaOffsetGen(TypedProperties props) { |
| this.props = props; |
| kafkaParams = new HashMap<>(); |
| for (Object prop : props.keySet()) { |
| kafkaParams.put(prop.toString(), props.get(prop.toString())); |
| } |
| DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME)); |
| topicName = props.getString(Config.KAFKA_TOPIC_NAME); |
| } |
| |
| public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) { |
| |
| // Obtain current metadata for the topic |
| Map<TopicPartition, Long> fromOffsets; |
| Map<TopicPartition, Long> toOffsets; |
| try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) { |
| if (!checkTopicExists(consumer)) { |
| throw new HoodieException("Kafka topic:" + topicName + " does not exist"); |
| } |
| List<PartitionInfo> partitionInfoList; |
| partitionInfoList = consumer.partitionsFor(topicName); |
| Set<TopicPartition> topicPartitions = partitionInfoList.stream() |
| .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); |
| |
| // Determine the offset ranges to read from |
| if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) { |
| fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions); |
| metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer)); |
| } else { |
| KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies |
| .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); |
| switch (autoResetValue) { |
| case EARLIEST: |
| fromOffsets = consumer.beginningOffsets(topicPartitions); |
| break; |
| case LATEST: |
| fromOffsets = consumer.endOffsets(topicPartitions); |
| break; |
| default: |
| throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' "); |
| } |
| } |
| |
| // Obtain the latest offsets. |
| toOffsets = consumer.endOffsets(topicPartitions); |
| } |
| |
| // Come up with final set of OffsetRanges to read (account for new partitions, limit number of events) |
| long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP, |
| Config.maxEventsFromKafkaSource); |
| |
| long numEvents; |
| if (sourceLimit == Long.MAX_VALUE) { |
| numEvents = maxEventsToReadFromKafka; |
| LOG.info("SourceLimit not configured, set numEvents to default value : " + maxEventsToReadFromKafka); |
| } else { |
| numEvents = sourceLimit; |
| } |
| |
| if (numEvents < toOffsets.size()) { |
| throw new HoodieException("sourceLimit should not be less than the number of kafka partitions"); |
| } |
| |
| return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); |
| } |
| |
| // check up checkpoint offsets is valid or not, if true, return checkpoint offsets, |
| // else return earliest offsets |
| private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer consumer, |
| Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) { |
| Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); |
| Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions); |
| |
| boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream() |
| .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); |
| return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; |
| } |
| |
| private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) { |
| Long delayCount = 0L; |
| Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); |
| Map<TopicPartition, Long> lastOffsets = consumer.endOffsets(topicPartitions); |
| |
| for (Map.Entry<TopicPartition, Long> entry : lastOffsets.entrySet()) { |
| Long offect = checkpointOffsets.getOrDefault(entry.getKey(), 0L); |
| delayCount += entry.getValue() - offect > 0 ? entry.getValue() - offect : 0L; |
| } |
| return delayCount; |
| } |
| |
| /** |
| * Check if topic exists. |
| * @param consumer kafka consumer |
| * @return |
| */ |
| public boolean checkTopicExists(KafkaConsumer consumer) { |
| Map<String, List<PartitionInfo>> result = consumer.listTopics(); |
| return result.containsKey(topicName); |
| } |
| |
| public String getTopicName() { |
| return topicName; |
| } |
| |
| public HashMap<String, Object> getKafkaParams() { |
| return kafkaParams; |
| } |
| } |