blob: 4b3a5489cf150d3816f434539b4940398f4bd39b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the
* Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.apache.crunch.kafka.record;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.DisconnectException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class KafkaRecordReaderTest {
@Mock
private KafkaConsumer<String, String> consumer;
@Mock
private TaskAttemptContext taskAttemptContext;
private TopicPartition topicPartition;
private long startOffset;
private long endOffset;
private KafkaInputSplit inputSplit;
private ConsumerRecords<String, String> records;
private KafkaRecordReader<String, String> reader;
@Before
public void before() throws IOException, InterruptedException {
when(taskAttemptContext.getConfiguration()).thenReturn(new Configuration(false));
startOffset = 0L;
endOffset = 100L;
topicPartition = new TopicPartition("topic", 0);
inputSplit = new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(), startOffset, endOffset);
records = new ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, "key", "value"))));
when(consumer.poll(anyLong())).thenReturn(records);
reader = new KafkaRecordReaderTester();
reader.initialize(inputSplit, taskAttemptContext);
}
@Test
public void getRecords_consumerPollThrowsException_thenReturnsMessage() {
// DisconnectException is retriable
when(consumer.poll(anyLong())).thenThrow(new DisconnectException()).thenReturn(records);
reader.loadRecords();
Iterator<ConsumerRecord<String, String>> iterator = reader.getRecordIterator();
assertThat(iterator.hasNext(), is(true));
assertThat(iterator.next(), is(records.records(topicPartition).get(0)));
}
@Test
public void getRecords_consumerPollEmpty_thenReturnsMessage() {
// DisconnectException is retriable
when(consumer.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition,
List<ConsumerRecord<String, String>>> emptyMap())).thenReturn(records);
reader.loadRecords();
Iterator<ConsumerRecord<String, String>> iterator = reader.getRecordIterator();
assertThat(iterator.hasNext(), is(true));
assertThat(iterator.next(), is(records.records(topicPartition).get(0)));
}
@Test
public void nextKeyValue() throws IOException, InterruptedException {
assertThat(reader.nextKeyValue(), is(true));
assertThat(reader.getCurrentKey(), is(records.records(topicPartition).get(0)));
assertThat(reader.getCurrentOffset(), is(0L));
}
@Test
public void nextKeyValue_recordOffsetAheadOfExpected() throws IOException, InterruptedException {
records = new ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
Collections.singletonList(new ConsumerRecord<>("topic", 0, 10L, "key", "value"))));
when(consumer.poll(anyLong())).thenReturn(records);
assertThat(reader.nextKeyValue(), is(true));
assertThat(reader.getCurrentKey(), is(records.records(topicPartition).get(0)));
assertThat(reader.getCurrentOffset(), is(10L));
}
@Test
public void nextKeyValue_noRecord_emptyPartition() throws IOException, InterruptedException {
when(consumer.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition,
List<ConsumerRecord<String, String>>> emptyMap()));
when(consumer.beginningOffsets(Collections.singletonList(topicPartition))).thenReturn(
Collections.singletonMap(topicPartition, endOffset));
assertThat(reader.nextKeyValue(), is(false));
}
@Test(expected = IOException.class)
public void nextKeyValue_noRecord_nonEmptyPartition() throws IOException, InterruptedException {
when(consumer.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition,
List<ConsumerRecord<String, String>>> emptyMap()));
reader.nextKeyValue();
}
@Test
public void nextKeyValue_recordIsBeyondEndOffset() throws IOException, InterruptedException {
records = new ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
Collections.singletonList(new ConsumerRecord<>("topic", 0, 100L, "key", "value"))));
when(consumer.poll(anyLong())).thenReturn(records);
assertThat(reader.nextKeyValue(), is(false));
}
@Test
public void getEarliestOffset_noOffsetFound() {
when(consumer.beginningOffsets(Collections.singletonList(inputSplit.getTopicPartition()))).thenReturn(
Collections.<TopicPartition, Long> emptyMap());
assertThat(reader.getEarliestOffset(), is(0L));
}
@Test
public void getEarliestOffset() {
when(consumer.beginningOffsets(Collections.singletonList(inputSplit.getTopicPartition()))).thenReturn(
Collections.singletonMap(inputSplit.getTopicPartition(), 100L));
assertThat(reader.getEarliestOffset(), is(100L));
}
private class KafkaRecordReaderTester extends KafkaRecordReader<String, String> {
@Override
protected KafkaConsumer<String, String> buildConsumer(Properties properties) {
return consumer;
}
}
}