blob: e8cb2a6f1e5f9f141121afbe99091f825aa47ba9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
/**
* Tests against {@link AvroKafkaSource}.
*/
public class TestKafkaSource extends UtilitiesTestBase {
private static String TEST_TOPIC_NAME = "hoodie_test";
private FilebasedSchemaProvider schemaProvider;
private KafkaTestUtils testUtils;
private HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initClass();
}
@AfterAll
public static void cleanupClass() {
UtilitiesTestBase.cleanupClass();
}
@BeforeEach
public void setup() throws Exception {
super.setup();
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
testUtils = new KafkaTestUtils();
testUtils.setup();
}
@AfterEach
public void teardown() throws Exception {
super.teardown();
testUtils.teardown();
}
private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource, String resetStrategy) {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", resetStrategy);
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
String.valueOf(Config.maxEventsFromKafkaSource));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
return props;
}
@Test
public void testJsonKafkaSource() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(null, "earliest");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(900, fetch1.getBatch().get().count());
// Test Avro To DataFrame<Row> path
Dataset<Row> fetch1AsRows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
schemaProvider.getSourceSchema().toString(), jsonSource.getSparkSession());
assertEquals(900, fetch1AsRows.count());
// 2. Produce new data, extract new data
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(1100, fetch2.getBatch().get().count());
// 3. Extract with previous checkpoint => gives same data back (idempotent)
InputBatch<JavaRDD<GenericRecord>> fetch3 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(fetch2.getBatch().get().count(), fetch3.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch3.getCheckpointForNextBatch());
// Same using Row API
InputBatch<Dataset<Row>> fetch3AsRows =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(fetch2.getBatch().get().count(), fetch3AsRows.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch3AsRows.getCheckpointForNextBatch());
// 4. Extract with latest checkpoint => no new data returned
InputBatch<JavaRDD<GenericRecord>> fetch4 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());
// Same using Row API
InputBatch<Dataset<Row>> fetch4AsRows =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4AsRows.getBatch());
}
// test case with kafka offset reset strategy
@Test
public void testJsonKafkaSourceResetStrategy() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties earliestProps = createPropsForJsonSource(null, "earliest");
Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter(earliestJsonSource);
TypedProperties latestProps = createPropsForJsonSource(null, "latest");
Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter(latestJsonSource);
// 1. Extract with a none data kafka checkpoint
// => get a checkpoint string like "hoodie_test,0:0,1:0", latest checkpoint should be equals to earliest checkpoint
InputBatch<JavaRDD<GenericRecord>> earFetch0 = earliestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
InputBatch<JavaRDD<GenericRecord>> latFetch0 = latestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(earFetch0.getBatch(), latFetch0.getBatch());
assertEquals(earFetch0.getCheckpointForNextBatch(), latFetch0.getCheckpointForNextBatch());
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
// 2. Extract new checkpoint with a null / empty string pre checkpoint
// => earliest fetch with max source limit will get all of data and a end offset checkpoint
InputBatch<JavaRDD<GenericRecord>> earFetch1 = earliestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
// => [a null pre checkpoint] latest reset fetch will get a end offset checkpoint same to earliest
InputBatch<JavaRDD<GenericRecord>> latFetch1 = latestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(earFetch1.getCheckpointForNextBatch(), latFetch1.getCheckpointForNextBatch());
}
@Test
public void testJsonKafkaSourceWithDefaultUpperCap() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
Config.maxEventsFromKafkaSource = 500;
/*
1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and
maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE
*/
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(1000, fetch1.getBatch().get().count());
// 2. Produce new data, extract new data based on sourceLimit
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
assertEquals(1000, fetch2.getBatch().get().count());
//reset the value back since it is a static variable
Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
}
@Test
public void testJsonKafkaSourceInsertRecordsLessSourceLimit() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
Config.maxEventsFromKafkaSource = 500;
/*
1. maxEventsFromKafkaSourceProp set to more than generated insert records
and sourceLimit less than the generated insert records num.
*/
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 400)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 300);
assertEquals(300, fetch1.getBatch().get().count());
/*
2. Produce new data, extract new data based on sourceLimit
and sourceLimit less than the generated insert records num.
*/
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 600)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 300);
assertEquals(300, fetch2.getBatch().get().count());
//reset the value back since it is a static variable
Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
}
@Test
public void testJsonKafkaSourceWithConfigurableUpperCap() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(500L, "earliest");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(900, fetch1.getBatch().get().count());
// 2. Produce new data, extract new data based on upper cap
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(500, fetch2.getBatch().get().count());
//fetch data respecting source limit where upper cap > sourceLimit
InputBatch<JavaRDD<GenericRecord>> fetch3 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), 400);
assertEquals(400, fetch3.getBatch().get().count());
//fetch data respecting source limit where upper cap < sourceLimit
InputBatch<JavaRDD<GenericRecord>> fetch4 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), 600);
assertEquals(600, fetch4.getBatch().get().count());
// 3. Extract with previous checkpoint => gives same data back (idempotent)
InputBatch<JavaRDD<GenericRecord>> fetch5 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(fetch2.getBatch().get().count(), fetch5.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch5.getCheckpointForNextBatch());
// 4. Extract with latest checkpoint => no new data returned
InputBatch<JavaRDD<GenericRecord>> fetch6 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch4.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch6.getBatch());
}
private static HashMap<TopicPartition, Long> makeOffsetMap(int[] partitions, long[] offsets) {
HashMap<TopicPartition, Long> map = new HashMap<>();
for (int i = 0; i < partitions.length; i++) {
map.put(new TopicPartition(TEST_TOPIC_NAME, partitions[i]), offsets[i]);
}
return map;
}
@Test
public void testComputeOffsetRanges() {
// test totalNewMessages()
long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[] {OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)});
assertEquals(200, totalMsgs);
// should consume all the full data
OffsetRange[] ranges =
CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 1000000L);
assertEquals(200000, CheckpointUtils.totalNewMessages(ranges));
// should only consume upto limit
ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 10000);
assertEquals(10000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(200000, ranges[0].fromOffset());
assertEquals(205000, ranges[0].untilOffset());
assertEquals(250000, ranges[1].fromOffset());
assertEquals(255000, ranges[1].untilOffset());
// should also consume from new partitions.
ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
makeOffsetMap(new int[] {0, 1, 2}, new long[] {300000, 350000, 100000}), 1000000L);
assertEquals(300000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(3, ranges.length);
// for skewed offsets, does not starve any partition & can catch up
ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 10000}), 100000);
assertEquals(100000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(10, ranges[0].count());
assertEquals(89990, ranges[1].count());
assertEquals(10000, ranges[2].count());
ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 10000}), 1000000);
assertEquals(110010, CheckpointUtils.totalNewMessages(ranges));
assertEquals(10, ranges[0].count());
assertEquals(100000, ranges[1].count());
assertEquals(10000, ranges[2].count());
// not all partitions consume same entries.
ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}),
makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000, 1000, 1000}), 1001);
assertEquals(1001, CheckpointUtils.totalNewMessages(ranges));
assertEquals(100, ranges[0].count());
assertEquals(226, ranges[1].count());
assertEquals(226, ranges[2].count());
assertEquals(226, ranges[3].count());
assertEquals(223, ranges[4].count());
}
}