blob: edd41326aa405beb10c1a81b61290ac81913310b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.kafka.source.reader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EmptySource;
import org.junit.jupiter.params.provider.NullAndEmptySource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Unit tests for {@link KafkaPartitionSplitReader}. */
public class KafkaPartitionSplitReaderTest {
private static final int NUM_SUBTASKS = 3;
private static final String TOPIC1 = "topic1";
private static final String TOPIC2 = "topic2";
private static final String TOPIC3 = "topic3";
private static Map<Integer, Map<String, KafkaPartitionSplit>> splitsByOwners;
private static Map<TopicPartition, Long> earliestOffsets;
private final IntegerDeserializer deserializer = new IntegerDeserializer();
@BeforeAll
public static void setup() throws Throwable {
KafkaSourceTestEnv.setup();
KafkaSourceTestEnv.setupTopic(TOPIC1, true, true, KafkaSourceTestEnv::getRecordsForTopic);
KafkaSourceTestEnv.setupTopic(TOPIC2, true, true, KafkaSourceTestEnv::getRecordsForTopic);
KafkaSourceTestEnv.createTestTopic(TOPIC3);
splitsByOwners =
KafkaSourceTestEnv.getSplitsByOwners(Arrays.asList(TOPIC1, TOPIC2), NUM_SUBTASKS);
earliestOffsets =
KafkaSourceTestEnv.getEarliestOffsets(
KafkaSourceTestEnv.getPartitionsForTopics(Arrays.asList(TOPIC1, TOPIC2)));
}
@AfterAll
public static void tearDown() throws Exception {
KafkaSourceTestEnv.tearDown();
}
@Test
public void testHandleSplitChangesAndFetch() throws Exception {
KafkaPartitionSplitReader reader = createReader();
assignSplitsAndFetchUntilFinish(reader, 0);
assignSplitsAndFetchUntilFinish(reader, 1);
}
@Test
public void testWakeUp() throws Exception {
KafkaPartitionSplitReader reader = createReader();
TopicPartition nonExistingTopicPartition = new TopicPartition("NotExist", 0);
assignSplits(
reader,
Collections.singletonMap(
KafkaPartitionSplit.toSplitId(nonExistingTopicPartition),
new KafkaPartitionSplit(nonExistingTopicPartition, 0)));
AtomicReference<Throwable> error = new AtomicReference<>();
Thread t =
new Thread(
() -> {
try {
reader.fetch();
} catch (Throwable e) {
error.set(e);
}
},
"testWakeUp-thread");
t.start();
long deadline = System.currentTimeMillis() + 5000L;
while (t.isAlive() && System.currentTimeMillis() < deadline) {
reader.wakeUp();
Thread.sleep(10);
}
assertThat(error.get()).isNull();
}
@Test
public void testWakeupThenAssign() throws IOException {
KafkaPartitionSplitReader reader = createReader();
// Assign splits with records
assignSplits(reader, splitsByOwners.get(0));
// Run a fetch operation, and it should not block
reader.fetch();
// Wake the reader up then assign a new split. This assignment should not throw
// WakeupException.
reader.wakeUp();
TopicPartition tp = new TopicPartition(TOPIC1, 0);
assignSplits(
reader,
Collections.singletonMap(
KafkaPartitionSplit.toSplitId(tp),
new KafkaPartitionSplit(tp, KafkaPartitionSplit.EARLIEST_OFFSET)));
}
@Test
public void testNumBytesInCounter() throws Exception {
final OperatorMetricGroup operatorMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
final Counter numBytesInCounter =
operatorMetricGroup.getIOMetricGroup().getNumBytesInCounter();
KafkaPartitionSplitReader reader =
createReader(
new Properties(),
InternalSourceReaderMetricGroup.wrap(operatorMetricGroup));
// Add a split
reader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
new KafkaPartitionSplit(new TopicPartition(TOPIC1, 0), 0L))));
reader.fetch();
final long latestNumBytesIn = numBytesInCounter.getCount();
// Since it's hard to know the exact number of bytes consumed, we just check if it is
// greater than 0
assertThat(latestNumBytesIn).isGreaterThan(0L);
// Add another split
reader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
new KafkaPartitionSplit(new TopicPartition(TOPIC2, 0), 0L))));
reader.fetch();
// We just check if numBytesIn is increasing
assertThat(numBytesInCounter.getCount()).isGreaterThan(latestNumBytesIn);
}
@ParameterizedTest
@EmptySource
@ValueSource(strings = {"_underscore.period-minus"})
public void testPendingRecordsGauge(String topicSuffix) throws Throwable {
final String topic1Name = TOPIC1 + topicSuffix;
final String topic2Name = TOPIC2 + topicSuffix;
if (!topicSuffix.isEmpty()) {
KafkaSourceTestEnv.setupTopic(
topic1Name, true, true, KafkaSourceTestEnv::getRecordsForTopic);
KafkaSourceTestEnv.setupTopic(
topic2Name, true, true, KafkaSourceTestEnv::getRecordsForTopic);
}
MetricListener metricListener = new MetricListener();
final Properties props = new Properties();
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
KafkaPartitionSplitReader reader =
createReader(
props,
InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
// Add a split
reader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
new KafkaPartitionSplit(new TopicPartition(topic1Name, 0), 0L))));
// pendingRecords should have not been registered because of lazily registration
assertThat(metricListener.getGauge(MetricNames.PENDING_RECORDS)).isNotPresent();
// Trigger first fetch
reader.fetch();
final Optional<Gauge<Long>> pendingRecords =
metricListener.getGauge(MetricNames.PENDING_RECORDS);
assertThat(pendingRecords).isPresent();
// Validate pendingRecords
assertThat(pendingRecords).isNotNull();
assertThat((long) pendingRecords.get().getValue()).isEqualTo(NUM_RECORDS_PER_PARTITION - 1);
for (int i = 1; i < NUM_RECORDS_PER_PARTITION; i++) {
reader.fetch();
assertThat((long) pendingRecords.get().getValue())
.isEqualTo(NUM_RECORDS_PER_PARTITION - i - 1);
}
// Add another split
reader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
new KafkaPartitionSplit(new TopicPartition(topic2Name, 0), 0L))));
// Validate pendingRecords
for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
reader.fetch();
assertThat((long) pendingRecords.get().getValue())
.isEqualTo(NUM_RECORDS_PER_PARTITION - i - 1);
}
}
@Test
public void testAssignEmptySplit() throws Exception {
KafkaPartitionSplitReader reader = createReader();
final KafkaPartitionSplit normalSplit =
new KafkaPartitionSplit(
new TopicPartition(TOPIC1, 0),
KafkaPartitionSplit.EARLIEST_OFFSET,
KafkaPartitionSplit.NO_STOPPING_OFFSET);
final KafkaPartitionSplit emptySplit =
new KafkaPartitionSplit(
new TopicPartition(TOPIC2, 0),
KafkaPartitionSplit.LATEST_OFFSET,
KafkaPartitionSplit.LATEST_OFFSET);
final KafkaPartitionSplit emptySplitWithZeroStoppingOffset =
new KafkaPartitionSplit(new TopicPartition(TOPIC3, 0), 0, 0);
reader.handleSplitsChanges(
new SplitsAddition<>(
Arrays.asList(normalSplit, emptySplit, emptySplitWithZeroStoppingOffset)));
// Fetch and check empty splits is added to finished splits
RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsWithSplitIds = reader.fetch();
assertThat(recordsWithSplitIds.finishedSplits()).contains(emptySplit.splitId());
assertThat(recordsWithSplitIds.finishedSplits())
.contains(emptySplitWithZeroStoppingOffset.splitId());
// Assign another valid split to avoid consumer.poll() blocking
final KafkaPartitionSplit anotherNormalSplit =
new KafkaPartitionSplit(
new TopicPartition(TOPIC1, 1),
KafkaPartitionSplit.EARLIEST_OFFSET,
KafkaPartitionSplit.NO_STOPPING_OFFSET);
reader.handleSplitsChanges(
new SplitsAddition<>(Collections.singletonList(anotherNormalSplit)));
// Fetch again and check empty split set is cleared
recordsWithSplitIds = reader.fetch();
assertThat(recordsWithSplitIds.finishedSplits()).isEmpty();
}
@Test
public void testUsingCommittedOffsetsWithNoneOffsetResetStrategy() {
final Properties props = new Properties();
props.setProperty(
ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-none-offset-reset");
KafkaPartitionSplitReader reader =
createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
// We expect that there is a committed offset, but the group does not actually have a
// committed offset, and the offset reset strategy is none (Throw exception to the consumer
// if no previous offset is found for the consumer's group);
// So it is expected to throw an exception that missing the committed offset.
assertThatThrownBy(
() ->
reader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
new KafkaPartitionSplit(
new TopicPartition(TOPIC1, 0),
KafkaPartitionSplit
.COMMITTED_OFFSET)))))
.isInstanceOf(KafkaException.class)
.hasMessageContaining("Undefined offset with no reset policy for partition");
}
@ParameterizedTest
@CsvSource({"earliest, 0", "latest, 10"})
public void testUsingCommittedOffsetsWithEarliestOrLatestOffsetResetStrategy(
String offsetResetStrategy, Long expectedOffset) {
final Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetResetStrategy);
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset");
KafkaPartitionSplitReader reader =
createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
// Add committed offset split
final TopicPartition partition = new TopicPartition(TOPIC1, 0);
reader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
new KafkaPartitionSplit(
partition, KafkaPartitionSplit.COMMITTED_OFFSET))));
// Verify that the current offset of the consumer is the expected offset
assertThat(reader.consumer().position(partition)).isEqualTo(expectedOffset);
}
@Test
public void testConsumerClientRackSupplier() {
String rackId = "use1-az1";
Properties properties = new Properties();
KafkaPartitionSplitReader reader =
createReader(
properties,
UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
rackId);
// Here we call the helper function directly, because the KafkaPartitionSplitReader
// doesn't allow us to examine the final ConsumerConfig object
reader.setConsumerClientRack(properties, rackId);
assertThat(properties.get(ConsumerConfig.CLIENT_RACK_CONFIG)).isEqualTo(rackId);
}
@ParameterizedTest
@NullAndEmptySource
public void testSetConsumerClientRackIgnoresNullAndEmpty(String rackId) {
Properties properties = new Properties();
KafkaPartitionSplitReader reader =
createReader(
properties,
UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
rackId);
// Here we call the helper function directly, because the KafkaPartitionSplitReader
// doesn't allow us to examine the final ConsumerConfig object
reader.setConsumerClientRack(properties, rackId);
assertThat(properties.containsKey(ConsumerConfig.CLIENT_RACK_CONFIG)).isFalse();
}
// ------------------
private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader reader, int readerId)
throws IOException {
Map<String, KafkaPartitionSplit> splits =
assignSplits(reader, splitsByOwners.get(readerId));
Map<String, Integer> numConsumedRecords = new HashMap<>();
Set<String> finishedSplits = new HashSet<>();
while (finishedSplits.size() < splits.size()) {
RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsBySplitIds = reader.fetch();
String splitId = recordsBySplitIds.nextSplit();
while (splitId != null) {
// Collect the records in this split.
List<ConsumerRecord<byte[], byte[]>> splitFetch = new ArrayList<>();
ConsumerRecord<byte[], byte[]> record;
while ((record = recordsBySplitIds.nextRecordFromSplit()) != null) {
splitFetch.add(record);
}
// Compute the expected next offset for the split.
TopicPartition tp = splits.get(splitId).getTopicPartition();
long earliestOffset = earliestOffsets.get(tp);
int numConsumedRecordsForSplit = numConsumedRecords.getOrDefault(splitId, 0);
long expectedStartingOffset = earliestOffset + numConsumedRecordsForSplit;
// verify the consumed records.
if (verifyConsumed(splits.get(splitId), expectedStartingOffset, splitFetch)) {
finishedSplits.add(splitId);
}
numConsumedRecords.compute(
splitId,
(ignored, recordCount) ->
recordCount == null
? splitFetch.size()
: recordCount + splitFetch.size());
splitId = recordsBySplitIds.nextSplit();
}
}
// Verify the number of records consumed from each split.
numConsumedRecords.forEach(
(splitId, recordCount) -> {
TopicPartition tp = splits.get(splitId).getTopicPartition();
long earliestOffset = earliestOffsets.get(tp);
long expectedRecordCount = NUM_RECORDS_PER_PARTITION - earliestOffset;
assertThat((long) recordCount)
.as(
String.format(
"%s should have %d records.",
splits.get(splitId), expectedRecordCount))
.isEqualTo(expectedRecordCount);
});
}
// ------------------
private KafkaPartitionSplitReader createReader() {
return createReader(
new Properties(), UnregisteredMetricsGroup.createSourceReaderMetricGroup());
}
private KafkaPartitionSplitReader createReader(
Properties additionalProperties, SourceReaderMetricGroup sourceReaderMetricGroup) {
return createReader(additionalProperties, sourceReaderMetricGroup, null);
}
private KafkaPartitionSplitReader createReader(
Properties additionalProperties,
SourceReaderMetricGroup sourceReaderMetricGroup,
String rackId) {
Properties props = new Properties();
props.putAll(KafkaSourceTestEnv.getConsumerProperties(ByteArrayDeserializer.class));
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
if (!additionalProperties.isEmpty()) {
props.putAll(additionalProperties);
}
KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
new KafkaSourceReaderMetrics(sourceReaderMetricGroup);
return new KafkaPartitionSplitReader(
props,
new TestingReaderContext(new Configuration(), sourceReaderMetricGroup),
kafkaSourceReaderMetrics,
rackId);
}
private Map<String, KafkaPartitionSplit> assignSplits(
KafkaPartitionSplitReader reader, Map<String, KafkaPartitionSplit> splits) {
SplitsChange<KafkaPartitionSplit> splitsChange =
new SplitsAddition<>(new ArrayList<>(splits.values()));
reader.handleSplitsChanges(splitsChange);
return splits;
}
private boolean verifyConsumed(
final KafkaPartitionSplit split,
final long expectedStartingOffset,
final Collection<ConsumerRecord<byte[], byte[]>> consumed) {
long expectedOffset = expectedStartingOffset;
for (ConsumerRecord<byte[], byte[]> record : consumed) {
int expectedValue = (int) expectedOffset;
long expectedTimestamp = expectedOffset * 1000L;
assertThat(deserializer.deserialize(record.topic(), record.value()))
.isEqualTo(expectedValue);
assertThat(record.offset()).isEqualTo(expectedOffset);
assertThat(record.timestamp()).isEqualTo(expectedTimestamp);
expectedOffset++;
}
if (split.getStoppingOffset().isPresent()) {
return expectedOffset == split.getStoppingOffset().get();
} else {
return false;
}
}
}