| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.crunch.kafka.inputformat; |
| |
| |
| import kafka.api.OffsetRequest; |
| import org.apache.crunch.Pair; |
| import org.apache.crunch.io.FormatBundle; |
| import org.apache.crunch.kafka.ClusterTest; |
| import org.apache.crunch.kafka.KafkaSource; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.common.TopicPartition; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TestName; |
| import org.junit.runner.RunWith; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Mock; |
| import org.mockito.runners.MockitoJUnitRunner; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets; |
| import static org.hamcrest.CoreMatchers.notNullValue; |
| import static org.hamcrest.core.Is.is; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.matchers.JUnitMatchers.hasItem; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| @RunWith(MockitoJUnitRunner.class) |
| public class KafkaInputFormatIT { |
| |
| @Rule |
| public TestName testName = new TestName(); |
| |
| @Mock |
| private TaskAttemptContext taskContext; |
| |
| @Mock |
| private FormatBundle bundle; |
| private Properties consumerProps; |
| private Configuration config; |
| private String topic; |
| |
| @BeforeClass |
| public static void setup() throws Exception { |
| ClusterTest.startTest(); |
| } |
| |
| @AfterClass |
| public static void cleanup() throws Exception { |
| ClusterTest.endTest(); |
| } |
| |
| @Before |
| public void setupTest() { |
| topic = testName.getMethodName(); |
| consumerProps = ClusterTest.getConsumerProperties(); |
| |
| consumerProps.setProperty(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), |
| KafkaSource.BytesDeserializer.class.getName()); |
| consumerProps.setProperty(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), |
| KafkaSource.BytesDeserializer.class.getName()); |
| |
| config = ClusterTest.getConsumerConfig(); |
| |
| config.set(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), |
| KafkaSource.BytesDeserializer.class.getName()); |
| config.set(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), |
| KafkaSource.BytesDeserializer.class.getName()); |
| } |
| |
| @Test |
| public void getSplitsFromFormat() throws IOException, InterruptedException { |
| List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10); |
| Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic); |
| Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic); |
| |
| Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); |
| for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { |
| Long endingOffset = endOffsets.get(entry.getKey()); |
| offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset)); |
| } |
| |
| KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); |
| |
| KafkaInputFormat inputFormat = new KafkaInputFormat(); |
| inputFormat.setConf(config); |
| List<InputSplit> splits = inputFormat.getSplits(null); |
| |
| assertThat(splits.size(), is(offsets.size())); |
| |
| for (InputSplit split : splits) { |
| KafkaInputSplit inputSplit = (KafkaInputSplit) split; |
| Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition()); |
| assertThat(inputSplit.getStartingOffset(), is(startEnd.first())); |
| assertThat(inputSplit.getEndingOffset(), is(startEnd.second())); |
| } |
| } |
| |
| @Test |
| public void getSplitsSameStartEnd() throws IOException, InterruptedException { |
| |
| Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); |
| for(int i = 0; i < 10; i++) { |
| offsets.put(new TopicPartition(topic, i), Pair.of((long)i, (long)i)); |
| } |
| |
| KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); |
| |
| KafkaInputFormat inputFormat = new KafkaInputFormat(); |
| inputFormat.setConf(config); |
| List<InputSplit> splits = inputFormat.getSplits(null); |
| |
| assertThat(splits.size(), is(0)); |
| } |
| |
| @Test |
| public void getSplitsCreateReaders() throws IOException, InterruptedException { |
| List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10); |
| Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic); |
| Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic); |
| |
| Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); |
| for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { |
| Long endingOffset = endOffsets.get(entry.getKey()); |
| offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset)); |
| } |
| |
| KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); |
| |
| KafkaInputFormat inputFormat = new KafkaInputFormat(); |
| inputFormat.setConf(config); |
| List<InputSplit> splits = inputFormat.getSplits(null); |
| |
| assertThat(splits.size(), is(offsets.size())); |
| |
| for (InputSplit split : splits) { |
| KafkaInputSplit inputSplit = (KafkaInputSplit) split; |
| Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition()); |
| assertThat(inputSplit.getStartingOffset(), is(startEnd.first())); |
| assertThat(inputSplit.getEndingOffset(), is(startEnd.second())); |
| } |
| |
| //create readers and consume the data |
| when(taskContext.getConfiguration()).thenReturn(config); |
| Set<String> keysRead = new HashSet<>(); |
| //read all data from all splits |
| for (InputSplit split : splits) { |
| KafkaInputSplit inputSplit = (KafkaInputSplit) split; |
| long start = inputSplit.getStartingOffset(); |
| long end = inputSplit.getEndingOffset(); |
| |
| RecordReader<BytesWritable, BytesWritable> recordReader = inputFormat.createRecordReader(split, taskContext); |
| recordReader.initialize(split, taskContext); |
| |
| int numRecordsFound = 0; |
| String currentKey; |
| while (recordReader.nextKeyValue()) { |
| currentKey = new String(recordReader.getCurrentKey().getBytes()); |
| keysRead.add(currentKey); |
| assertThat(keys, hasItem(currentKey)); |
| assertThat(recordReader.getCurrentValue(), is(notNullValue())); |
| numRecordsFound++; |
| } |
| recordReader.close(); |
| |
| //assert that it encountered a partitions worth of data |
| assertThat(((long) numRecordsFound), is(end - start)); |
| } |
| |
| //validate the same number of unique keys was read as were written. |
| assertThat(keysRead.size(), is(keys.size())); |
| } |
| |
| @Test |
| public void writeOffsetsToFormatBundle() { |
| Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); |
| String topic = testName.getMethodName(); |
| int numPartitions = 10; |
| for (int i = 0; i < numPartitions; i++) { |
| TopicPartition tAndP = new TopicPartition(topic, i); |
| offsets.put(tAndP, Pair.of((long) i, i * 10L)); |
| } |
| |
| KafkaInputFormat.writeOffsetsToBundle(offsets, bundle); |
| |
| ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class); |
| ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class); |
| |
| //number of Partitions * 2 for start and end + 1 for the topic |
| verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture()); |
| |
| List<String> keyValues = keyCaptor.getAllValues(); |
| List<String> valueValues = valueCaptor.getAllValues(); |
| |
| String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic); |
| assertThat(keyValues, hasItem(partitionKey)); |
| |
| String partitions = valueValues.get(keyValues.indexOf(partitionKey)); |
| List<String> parts = Arrays.asList(partitions.split(",")); |
| |
| for (int i = 0; i < numPartitions; i++) { |
| assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic))); |
| String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i); |
| String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i); |
| assertThat(keyValues, hasItem(startKey)); |
| assertThat(keyValues, hasItem(endKey)); |
| assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i))); |
| assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L))); |
| assertThat(parts, hasItem(Long.toString(i))); |
| } |
| } |
| |
| @Test |
| public void writeOffsetsToFormatBundleSpecialCharacters() { |
| Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); |
| String topic = "partitions." + testName.getMethodName(); |
| int numPartitions = 10; |
| for (int i = 0; i < numPartitions; i++) { |
| TopicPartition tAndP = new TopicPartition(topic, i); |
| offsets.put(tAndP, Pair.of((long) i, i * 10L)); |
| } |
| |
| KafkaInputFormat.writeOffsetsToBundle(offsets, bundle); |
| |
| ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class); |
| ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class); |
| |
| //number of Partitions * 2 for start and end + 1 for the topic |
| verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture()); |
| |
| List<String> keyValues = keyCaptor.getAllValues(); |
| List<String> valueValues = valueCaptor.getAllValues(); |
| |
| String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic); |
| assertThat(keyValues, hasItem(partitionKey)); |
| |
| String partitions = valueValues.get(keyValues.indexOf(partitionKey)); |
| List<String> parts = Arrays.asList(partitions.split(",")); |
| |
| for (int i = 0; i < numPartitions; i++) { |
| assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic))); |
| String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i); |
| String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i); |
| assertThat(keyValues, hasItem(startKey)); |
| assertThat(keyValues, hasItem(endKey)); |
| assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i))); |
| assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L))); |
| assertThat(parts, hasItem(Long.toString(i))); |
| } |
| } |
| |
| @Test |
| public void writeOffsetsToFormatBundleMultipleTopics() { |
| Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); |
| Set<String> topics = new HashSet<>(); |
| |
| int numPartitions = 10; |
| int numTopics = 10; |
| for (int j = 0; j < numTopics; j++) { |
| String topic = testName.getMethodName() + j; |
| topics.add(topic); |
| for (int i = 0; i < numPartitions; i++) { |
| TopicPartition tAndP = new TopicPartition(topic, i); |
| offsets.put(tAndP, Pair.of((long) i, i * 10L)); |
| } |
| } |
| |
| KafkaInputFormat.writeOffsetsToBundle(offsets, bundle); |
| |
| ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class); |
| ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class); |
| |
| //number of Partitions * 2 for start and end + num of topics |
| verify(bundle, times((numTopics * numPartitions * 2) + numTopics)).set(keyCaptor.capture(), valueCaptor.capture()); |
| |
| List<String> keyValues = keyCaptor.getAllValues(); |
| List<String> valueValues = valueCaptor.getAllValues(); |
| |
| for (String topic : topics) { |
| |
| String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic); |
| assertThat(keyValues, hasItem(partitionKey)); |
| |
| String partitions = valueValues.get(keyValues.indexOf(partitionKey)); |
| List<String> parts = Arrays.asList(partitions.split(",")); |
| |
| for (int i = 0; i < numPartitions; i++) { |
| assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic))); |
| String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i); |
| String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i); |
| assertThat(keyValues, hasItem(startKey)); |
| assertThat(keyValues, hasItem(endKey)); |
| assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i))); |
| assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L))); |
| assertThat(parts, hasItem(Long.toString(i))); |
| } |
| } |
| } |
| |
| @Test |
| public void getOffsetsFromConfig() { |
| Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); |
| Set<String> topics = new HashSet<>(); |
| |
| int numPartitions = 10; |
| int numTopics = 10; |
| for (int j = 0; j < numTopics; j++) { |
| String topic = testName.getMethodName() + ".partitions" + j; |
| topics.add(topic); |
| for (int i = 0; i < numPartitions; i++) { |
| TopicPartition tAndP = new TopicPartition(topic, i); |
| offsets.put(tAndP, Pair.of((long) i, i * 10L)); |
| } |
| } |
| |
| Configuration config = new Configuration(false); |
| |
| KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); |
| |
| Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config); |
| |
| assertThat(returnedOffsets.size(), is(returnedOffsets.size())); |
| for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { |
| Pair<Long, Long> valuePair = returnedOffsets.get(entry.getKey()); |
| assertThat(valuePair, is(entry.getValue())); |
| } |
| } |
| |
| @Test |
| public void generateConnectionPropertyKey() { |
| String propertyName = "some.property"; |
| String actual = KafkaInputFormat.generateConnectionPropertyKey(propertyName); |
| String expected = "org.apache.crunch.kafka.connection.properties.some.property"; |
| assertThat(expected, is(actual)); |
| } |
| |
| @Test |
| public void getConnectionPropertyFromKey() { |
| String prefixedConnectionProperty = "org.apache.crunch.kafka.connection.properties.some.property"; |
| String actual = KafkaInputFormat.getConnectionPropertyFromKey(prefixedConnectionProperty); |
| String expected = "some.property"; |
| assertThat(expected, is(actual)); |
| } |
| |
| @Test |
| public void writeConnectionPropertiesToBundle() { |
| FormatBundle<KafkaInputFormat> actual = FormatBundle.forInput(KafkaInputFormat.class); |
| Properties connectionProperties = new Properties(); |
| connectionProperties.put("key1", "value1"); |
| connectionProperties.put("key2", "value2"); |
| KafkaInputFormat.writeConnectionPropertiesToBundle(connectionProperties, actual); |
| |
| FormatBundle<KafkaInputFormat> expected = FormatBundle.forInput(KafkaInputFormat.class); |
| expected.set("key1", "value1"); |
| expected.set("key2", "value2"); |
| |
| assertThat(expected, is(actual)); |
| } |
| |
| @Test |
| public void filterConnectionProperties() { |
| Properties props = new Properties(); |
| props.put("org.apache.crunch.kafka.connection.properties.key1", "value1"); |
| props.put("org.apache.crunch.kafka.connection.properties.key2", "value2"); |
| props.put("org_apache_crunch_kafka_connection_properties.key3", "value3"); |
| props.put("org.apache.crunch.another.prefix.properties.key4", "value4"); |
| |
| Properties actual = KafkaInputFormat.filterConnectionProperties(props); |
| Properties expected = new Properties(); |
| expected.put("key1", "value1"); |
| expected.put("key2", "value2"); |
| |
| assertThat(expected, is(actual)); |
| } |
| |
| |
| @Test(expected=IllegalStateException.class) |
| public void getOffsetsFromConfigMissingStart() { |
| Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); |
| Set<String> topics = new HashSet<>(); |
| |
| int numPartitions = 10; |
| int numTopics = 10; |
| for (int j = 0; j < numTopics; j++) { |
| String topic = testName.getMethodName() + ".partitions" + j; |
| topics.add(topic); |
| for (int i = 0; i < numPartitions; i++) { |
| TopicPartition tAndP = new TopicPartition(topic, i); |
| offsets.put(tAndP, Pair.of((long) i, i * 10L)); |
| } |
| } |
| |
| Configuration config = new Configuration(false); |
| |
| KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); |
| |
| config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.start"); |
| |
| Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config); |
| } |
| |
| @Test(expected=IllegalStateException.class) |
| public void getOffsetsFromConfigMissingEnd() { |
| Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); |
| Set<String> topics = new HashSet<>(); |
| |
| int numPartitions = 10; |
| int numTopics = 10; |
| for (int j = 0; j < numTopics; j++) { |
| String topic = testName.getMethodName() + ".partitions" + j; |
| topics.add(topic); |
| for (int i = 0; i < numPartitions; i++) { |
| TopicPartition tAndP = new TopicPartition(topic, i); |
| offsets.put(tAndP, Pair.of((long) i, i * 10L)); |
| } |
| } |
| |
| Configuration config = new Configuration(false); |
| |
| KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); |
| |
| config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.end"); |
| |
| Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config); |
| } |
| } |