| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.crunch.kafka.inputformat; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.crunch.Pair; |
| import org.apache.crunch.io.FormatBundle; |
| import org.apache.hadoop.conf.Configurable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.mapreduce.InputFormat; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.kafka.common.TopicPartition; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.regex.Pattern; |
| |
| /** |
| * Basic input format for reading data from Kafka. Data is read and maintained in its pure byte form and wrapped inside of a |
| * {@link BytesWritable} instance. |
| * |
| * Populating the configuration of the input format is handled with the convenience method of |
| * {@link #writeOffsetsToConfiguration(Map, Configuration)}. This should be done to ensure |
| * the Kafka offset information is available when the input format {@link #getSplits(JobContext) creates its splits} |
| * and {@link #createRecordReader(InputSplit, TaskAttemptContext) readers}. |
| * |
| * To suppress warnings generated by unused configs in the {@link org.apache.kafka.clients.consumer.ConsumerConfig ConsumerConfig}, |
| * one can use {@link #tagExistingKafkaConnectionProperties(Properties) tagExistingKafkaConnectionProperties} and |
| * {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey} to prefix Kafka connection properties with |
| * "org.apache.crunch.kafka.connection.properties" to allow for retrieval later using {@link #getConnectionPropertyFromKey(String) |
| * getConnectionPropertyFromKey} and {@link #filterConnectionProperties(Properties) filterConnectionProperties}. |
| */ |
| |
| public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> implements Configurable { |
| |
| /** |
| * Constant for constructing configuration keys for the input format. |
| */ |
| private static final String KAFKA_INPUT_OFFSETS_BASE = "org.apache.crunch.kafka.offsets.topic"; |
| |
| /** |
| * Constant used for building configuration keys and specifying partitions. |
| */ |
| private static final String PARTITIONS = "partitions"; |
| |
| /** |
| * Constant used for building configuration keys and specifying the start of a partition. |
| */ |
| private static final String START = "start"; |
| |
| /** |
| * Constant used for building configuration keys and specifying the end of a partition. |
| */ |
| private static final String END = "end"; |
| |
| /** |
| * Regex to discover all of the defined partitions which should be consumed by the input format. |
| */ |
| private static final String TOPIC_KEY_REGEX = KAFKA_INPUT_OFFSETS_BASE + "\\..*\\." + PARTITIONS + "$"; |
| |
| /** |
| * Constant for constructing configuration keys for the Kafka connection properties. |
| */ |
| private static final String KAFKA_CONNECTION_PROPERTY_BASE = "org.apache.crunch.kafka.connection.properties"; |
| |
| /** |
| * Regex to discover all of the defined Kafka connection properties which should be passed to the ConsumerConfig. |
| */ |
| private static final Pattern CONNECTION_PROPERTY_REGEX = |
| Pattern.compile(Pattern.quote(KAFKA_CONNECTION_PROPERTY_BASE) + "\\..*$"); |
| |
| private Configuration configuration; |
| |
| @Override |
| public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException { |
| Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf()); |
| List<InputSplit> splits = new LinkedList<>(); |
| for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { |
| TopicPartition topicPartition = entry.getKey(); |
| |
| long start = entry.getValue().first(); |
| long end = entry.getValue().second(); |
| if(start != end) { |
| splits.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(), entry.getValue().first(), |
| entry.getValue().second())); |
| } |
| } |
| |
| return splits; |
| } |
| |
| @Override |
| public RecordReader<BytesWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) |
| throws IOException, InterruptedException { |
| return new KafkaRecordReader<>(); |
| } |
| |
| @Override |
| public void setConf(Configuration configuration) { |
| this.configuration = configuration; |
| } |
| |
| @Override |
| public Configuration getConf() { |
| return configuration; |
| } |
| |
| |
| //The following methods are used for reading and writing Kafka Partition offset information into Hadoop's Configuration |
| //objects and into Crunch's FormatBundle. For a specific Kafka Topic it might have one or many partitions and for |
| //each partition it will need a start and end offset. Assuming you have a topic of "abc" and it has 2 partitions the |
| //configuration would be populated with the following: |
| // org.apache.crunch.kafka.offsets.topic.abc.partitions = [0,1] |
| // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start = <partition start> |
| // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end = <partition end> |
| // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.start = <partition start> |
| // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.end = <partition end> |
| |
| /** |
| * Writes the start and end offsets for the provided topic partitions to the {@code bundle}. |
| * |
| * @param offsets The starting and ending offsets for the topics and partitions. |
| * @param bundle the bundle into which the information should be persisted. |
| */ |
| public static void writeOffsetsToBundle(Map<TopicPartition, Pair<Long, Long>> offsets, FormatBundle bundle) { |
| for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) { |
| bundle.set(entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| /** |
| * Writes the start and end offsets for the provided topic partitions to the {@code config}. |
| * |
| * @param offsets The starting and ending offsets for the topics and partitions. |
| * @param config the config into which the information should be persisted. |
| */ |
| public static void writeOffsetsToConfiguration(Map<TopicPartition, Pair<Long, Long>> offsets, Configuration config) { |
| for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) { |
| config.set(entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| /** |
| * Reads the {@code configuration} to determine which topics, partitions, and offsets should be used for reading data. |
| * |
| * @param configuration the configuration to derive the data to read. |
| * @return a map of {@link TopicPartition} to a pair of start and end offsets. |
| * @throws IllegalStateException if the {@code configuration} does not have the start and end offsets set properly |
| * for a partition. |
| */ |
| public static Map<TopicPartition, Pair<Long, Long>> getOffsets(Configuration configuration) { |
| Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); |
| //find configuration for all of the topics with defined partitions |
| Map<String, String> topicPartitionKeys = configuration.getValByRegex(TOPIC_KEY_REGEX); |
| |
| //for each topic start to process it's partitions |
| for (String key : topicPartitionKeys.keySet()) { |
| String topic = getTopicFromKey(key); |
| int[] partitions = configuration.getInts(key); |
| //for each partition find and add the start/end offset |
| for (int partitionId : partitions) { |
| TopicPartition topicPartition = new TopicPartition(topic, partitionId); |
| long start = configuration.getLong(generatePartitionStartKey(topic, partitionId),Long.MIN_VALUE); |
| long end = configuration.getLong(generatePartitionEndKey(topic, partitionId), |
| Long.MIN_VALUE); |
| |
| if(start == Long.MIN_VALUE || end == Long.MIN_VALUE){ |
| throw new IllegalStateException("The "+topicPartition+ "has an invalid start:"+start+ " or end:"+end |
| +" offset configured."); |
| } |
| |
| offsets.put(topicPartition, Pair.of(start, end)); |
| } |
| } |
| |
| return offsets; |
| } |
| |
| private static Map<String, String> generateValues(Map<TopicPartition, Pair<Long, Long>> offsets) { |
| Map<String, String> offsetConfigValues = new HashMap<>(); |
| Map<String, Set<Integer>> topicsPartitions = new HashMap<>(); |
| |
| for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { |
| TopicPartition topicPartition = entry.getKey(); |
| String topic = topicPartition.topic(); |
| int partition = topicPartition.partition(); |
| String startKey = generatePartitionStartKey(topic, partition); |
| String endKey = generatePartitionEndKey(topic, partition); |
| //Add the start and end offsets for a specific partition |
| offsetConfigValues.put(startKey, Long.toString(entry.getValue().first())); |
| offsetConfigValues.put(endKey, Long.toString(entry.getValue().second())); |
| |
| Set<Integer> partitions = topicsPartitions.get(topic); |
| if (partitions == null) { |
| partitions = new HashSet<>(); |
| topicsPartitions.put(topic, partitions); |
| } |
| partitions.add(partition); |
| } |
| |
| //generate the partitions values for each topic |
| for (Map.Entry<String, Set<Integer>> entry : topicsPartitions.entrySet()) { |
| String key = KAFKA_INPUT_OFFSETS_BASE + "." + entry.getKey() + "." + PARTITIONS; |
| Set<Integer> partitions = entry.getValue(); |
| String partitionsString = StringUtils.join(partitions, ","); |
| offsetConfigValues.put(key, partitionsString); |
| } |
| |
| return offsetConfigValues; |
| } |
| |
| static String generatePartitionStartKey(String topic, int partition) { |
| return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + START; |
| } |
| |
| static String generatePartitionEndKey(String topic, int partition) { |
| return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + END; |
| } |
| |
| static String generateTopicPartitionsKey(String topic) { |
| return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS; |
| } |
| |
| static String getTopicFromKey(String key) { |
| //strip off the base key + a trailing "." |
| String value = key.substring(KAFKA_INPUT_OFFSETS_BASE.length() + 1); |
| //strip off the end part + a preceding "." |
| value = value.substring(0, (value.length() - (PARTITIONS.length() + 1))); |
| |
| return value; |
| } |
| |
| // The following methods are convenience methods for dealing with Kafka connection properties. This includes: |
| // - writing Kafka connection properties to a FormatBundle |
| // - generating tagged Kafka connection properties using the prefix "org.apache.crunch.kafka.connection.properties" |
| // - retrieving Kafka connection properties prefixed by "org.apache.crunch.kafka.connection.properties" |
| // - filtering out Kafka connection properties from a Properties object |
| // - tagging all properties in a Properties object with the Kafka connection properties prefix |
| // The tagging of the Kafka connection properties allows for suppression of "isn't a known config" ConsumerConfig warnings that |
| // are generated by unused properties carried over from a Hadoop configuration. |
| |
| /** |
| * Writes the Kafka connection properties to the {@code bundle}. |
| * |
| * @param connectionProperties the Kafka connection properties |
| * @param bundle the bundle into which the information should be persisted. |
| */ |
| public static void writeConnectionPropertiesToBundle(Properties connectionProperties, FormatBundle bundle) { |
| for (final String name : connectionProperties.stringPropertyNames()) { |
| bundle.set(name, connectionProperties.getProperty(name)); |
| } |
| } |
| |
| /** |
| * Prefixes a given property with "org.apache.crunch.kafka.connection.properties" to allow for filtering with |
| * {@link #filterConnectionProperties(Properties) filterConnectionProperties}. |
| * |
| * @param property the Kafka connection property that will be prefixed for retrieval at a later time. |
| * @return the property prefixed "org.apache.crunch.kafka.connection.properties" |
| */ |
| static String generateConnectionPropertyKey(String property) { |
| return KAFKA_CONNECTION_PROPERTY_BASE + "." + property; |
| } |
| |
| /** |
| * |
| * Retrieves the original property that was tagged using {@link #generateConnectionPropertyKey(String) |
| * generateConnectionPropertyKey}. |
| * |
| * @param key the key that was tagged using {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey}. |
| * @return the original property prior to tagging. |
| */ |
| static String getConnectionPropertyFromKey(String key) { |
| // Strip off the base key + a trailing "." |
| return key.substring(KAFKA_CONNECTION_PROPERTY_BASE.length() + 1); |
| } |
| |
| /** |
| * Generates a {@link Properties} object containing the properties in {@code connectionProperties}, but with every |
| * property prefixed with "org.apache.crunch.kafka.connection.properties". |
| * |
| * @param connectionProperties the properties to be prefixed with "org.apache.crunch.kafka.connection.properties" |
| * @return a {@link Properties} object representing Kafka connection properties |
| */ |
| public static Properties tagExistingKafkaConnectionProperties(Properties connectionProperties) { |
| Properties taggedProperties = new Properties(); |
| |
| for (final String name : connectionProperties.stringPropertyNames()) { |
| taggedProperties.put(generateConnectionPropertyKey(name), connectionProperties.getProperty(name)); |
| } |
| |
| return taggedProperties; |
| } |
| |
| /** |
| * Filters out Kafka connection properties that were tagged using {@link #generateConnectionPropertyKey(String) |
| * generateConnectionPropertyKey}. |
| * |
| * @param props the properties to be filtered. |
| * @return the properties containing Kafka connection information that were tagged using |
| * {@link #generateConnectionPropertyKey(String)}. |
| */ |
| public static Properties filterConnectionProperties(Properties props) { |
| Properties filteredProperties = new Properties(); |
| |
| for (final String name : props.stringPropertyNames()) { |
| if (CONNECTION_PROPERTY_REGEX.matcher(name).matches()) { |
| filteredProperties.put(getConnectionPropertyFromKey(name), props.getProperty(name)); |
| } |
| } |
| |
| return filteredProperties; |
| } |
| } |