blob: 5ae376c17e64f43e65b7531da9d6e529e96905ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.kafka.record;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.From;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.io.To;
import org.apache.crunch.kafka.ClusterTest;
import org.apache.crunch.kafka.KafkaUtils;
import org.apache.crunch.kafka.utils.KafkaTestUtils;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.junit.*;
import org.junit.rules.TestName;
import java.util.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.matchers.JUnitMatchers.hasItem;
public class KafkaSourceIT {
@Rule
public TemporaryPath path = new TemporaryPath();
@Rule
public TestName testName = new TestName();
private Properties consumerProps;
private String topic;
@BeforeClass
public static void setup() throws Exception {
ClusterTest.startTest();
}
@AfterClass
public static void cleanup() throws Exception {
ClusterTest.endTest();
}
@Before
public void setupTest() {
topic = UUID.randomUUID().toString();
consumerProps = ClusterTest.getConsumerProperties();
}
@Test
public void defaultEarliestOffsetReset() {
Map<TopicPartition, Pair<Long, Long>> offsets = Collections.emptyMap();
//Remove this so should revert to default.
consumerProps.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
KafkaSource kafkaSource = new KafkaSource(consumerProps, offsets);
FormatBundle inputBundle = kafkaSource.getInputBundle();
Configuration cfg = new Configuration(false);
inputBundle.configure(cfg);
Properties kafkaConnectionProperties = KafkaUtils.getKafkaConnectionProperties(cfg);
kafkaConnectionProperties = KafkaInputFormat.filterConnectionProperties(kafkaConnectionProperties);
assertThat(kafkaConnectionProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("earliest"));
}
@Test
public void offsetResetOverridable() {
Map<TopicPartition, Pair<Long, Long>> offsets = Collections.emptyMap();
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
KafkaSource kafkaSource = new KafkaSource(consumerProps, offsets);
FormatBundle inputBundle = kafkaSource.getInputBundle();
Configuration cfg = new Configuration(false);
inputBundle.configure(cfg);
Properties kafkaConnectionProperties = KafkaUtils.getKafkaConnectionProperties(cfg);
kafkaConnectionProperties = KafkaInputFormat.filterConnectionProperties(kafkaConnectionProperties);
assertThat(kafkaConnectionProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("latest"));
}
@Test
public void sourceReadData() {
List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
Map<TopicPartition, Long> startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
Long endingOffset = endOffsets.get(entry.getKey());
offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
}
Configuration config = ClusterTest.getConf();
Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
pipeline.enableDebug();
ReadableSource<ConsumerRecord<BytesWritable, BytesWritable>> kafkaSource = new KafkaSource(consumerProps, offsets);
PCollection<ConsumerRecord<BytesWritable, BytesWritable>> read = pipeline.read(kafkaSource);
Set<String> keysRead = new HashSet<>();
int numRecordsFound = 0;
String currentKey;
for (ConsumerRecord<BytesWritable, BytesWritable> record : read.materialize()) {
currentKey = new String(record.key().getBytes());
assertThat(keys, hasItem(currentKey));
numRecordsFound++;
keysRead.add(new String(record.key().getBytes()));
}
assertThat(numRecordsFound, is(keys.size()));
assertThat(keysRead.size(), is(keys.size()));
pipeline.done();
}
@Test
public void sourceReadDataThroughPipeline() {
List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
Map<TopicPartition, Long> startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
Long endingOffset = endOffsets.get(entry.getKey());
offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
}
Configuration config = ClusterTest.getConf();
Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
pipeline.enableDebug();
ReadableSource<ConsumerRecord<BytesWritable, BytesWritable>> kafkaSource = new KafkaSource(consumerProps, offsets);
PCollection<ConsumerRecord<BytesWritable, BytesWritable>> read = pipeline.read(kafkaSource);
Path out = path.getPath("out");
read.parallelDo(new KafkaSourceIT.SimpleConvertFn(), Avros.strings()).write(To.textFile(out));
pipeline.run();
PCollection<String> persistedKeys = pipeline.read(From.textFile(out));
Set<String> keysRead = new HashSet<>();
int numRecordsFound = 0;
for (String value : persistedKeys.materialize()) {
assertThat(keys, hasItem(value));
numRecordsFound++;
keysRead.add(value);
}
assertThat(numRecordsFound, is(keys.size()));
assertThat(keysRead.size(), is(keys.size()));
pipeline.done();
}
@Test
public void sourceReadDataThroughPipelineMultipleSplitsPerPartition() {
Configuration config = ClusterTest.getConf();
config.setLong(KafkaInputFormat.KAFKA_MAX_RECORDS_PER_SPLIT, 7L);
List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
Map<TopicPartition, Long> startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
Long endingOffset = endOffsets.get(entry.getKey());
offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
}
Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
pipeline.enableDebug();
ReadableSource<ConsumerRecord<BytesWritable, BytesWritable>> kafkaSource = new KafkaSource(consumerProps, offsets);
PCollection<ConsumerRecord<BytesWritable, BytesWritable>> read = pipeline.read(kafkaSource);
Path out = path.getPath("out");
read.parallelDo(new KafkaSourceIT.SimpleConvertFn(), Avros.strings()).write(To.textFile(out));
pipeline.run();
PCollection<String> persistedKeys = pipeline.read(From.textFile(out));
Set<String> keysRead = new HashSet<>();
int numRecordsFound = 0;
for (String value : persistedKeys.materialize()) {
assertThat(keys, hasItem(value));
numRecordsFound++;
keysRead.add(value);
}
assertThat(numRecordsFound, is(keys.size()));
assertThat(keysRead.size(), is(keys.size()));
pipeline.done();
}
private static class SimpleConvertFn extends MapFn<ConsumerRecord<BytesWritable, BytesWritable>, String> {
@Override
public String map(ConsumerRecord<BytesWritable, BytesWritable> record) {
return new String(record.key().getBytes());
}
}
}