blob: ddab2e20de63e031063e0feef565772dbaa57d2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources.debezium;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS;
/**
* Base class for Debezium streaming source which expects change events as Kafka Avro records.
* Obtains the schema from the confluent schema-registry.
*/
public abstract class DebeziumSource extends RowSource {
private static final Logger LOG = LoggerFactory.getLogger(DebeziumSource.class);
// these are native kafka's config. do not change the config names.
private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
private static final String OVERRIDE_CHECKPOINT_STRING = "hoodie.debezium.override.initial.checkpoint.key";
private static final String CONNECT_NAME_KEY = "connect.name";
private static final String DATE_CONNECT_NAME = "custom.debezium.DateString";
private final KafkaOffsetGen offsetGen;
private final HoodieIngestionMetrics metrics;
private final SchemaRegistryProvider schemaRegistryProvider;
private final String deserializerClassName;
public DebeziumSource(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider,
HoodieIngestionMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider);
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
deserializerClassName = getStringWithAltKeys(props, KAFKA_AVRO_VALUE_DESERIALIZER_CLASS, true);
try {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName());
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
LOG.error(error);
throw new HoodieReadFromSourceException(error, e);
}
// Currently, debezium source requires Confluent/Kafka schema-registry to fetch the latest schema.
if (schemaProvider == null || !(schemaProvider instanceof SchemaRegistryProvider)) {
schemaRegistryProvider = new SchemaRegistryProvider(props, sparkContext);
} else {
schemaRegistryProvider = (SchemaRegistryProvider) schemaProvider;
}
offsetGen = new KafkaOffsetGen(props);
this.metrics = metrics;
}
@Override
protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
String overrideCheckpointStr = props.getString(OVERRIDE_CHECKPOINT_STRING, "");
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCkptStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs == 0) {
// If there are no new messages, use empty dataframe with no schema. This is because the schema from schema registry can only be considered
// up to date if a change event has occurred.
return Pair.of(Option.of(sparkSession.emptyDataFrame()), overrideCheckpointStr.isEmpty() ? CheckpointUtils.offsetsToStr(offsetRanges) : overrideCheckpointStr);
} else {
try {
String schemaStr = schemaRegistryProvider.fetchSchemaFromRegistry(getStringWithAltKeys(props, HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
Dataset<Row> dataset = toDataset(offsetRanges, offsetGen, schemaStr);
LOG.info(String.format("Spark schema of Kafka Payload for topic %s:\n%s", offsetGen.getTopicName(), dataset.schema().treeString()));
LOG.info(String.format("New checkpoint string: %s", CheckpointUtils.offsetsToStr(offsetRanges)));
return Pair.of(Option.of(dataset), overrideCheckpointStr.isEmpty() ? CheckpointUtils.offsetsToStr(offsetRanges) : overrideCheckpointStr);
} catch (Exception e) {
LOG.error("Fatal error reading and parsing incoming debezium event", e);
throw new HoodieReadFromSourceException("Fatal error reading and parsing incoming debezium event", e);
}
}
}
/**
* Debezium Kafka Payload has a nested structure, flatten it specific to the Database type.
* @param rawKafkaData Dataset of the Debezium CDC event from the kafka
* @return A flattened dataset.
*/
protected abstract Dataset<Row> processDataset(Dataset<Row> rawKafkaData);
/**
* Converts a Kafka Topic offset into a Spark dataset.
*
* @param offsetRanges Offset ranges
* @param offsetGen KafkaOffsetGen
* @return Spark dataset
*/
private Dataset<Row> toDataset(OffsetRange[] offsetRanges, KafkaOffsetGen offsetGen, String schemaStr) {
AvroConvertor convertor = new AvroConvertor(schemaStr);
Dataset<Row> kafkaData;
if (deserializerClassName.equals(StringDeserializer.class.getName())) {
kafkaData = AvroConversionUtils.createDataFrame(
KafkaUtils.<String, String>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent())
.map(obj -> convertor.fromJson(obj.value()))
.rdd(), schemaStr, sparkSession);
} else {
kafkaData = AvroConversionUtils.createDataFrame(
KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent())
.map(obj -> (GenericRecord) obj.value())
.rdd(), schemaStr, sparkSession);
}
// Flatten debezium payload, specific to each DB type (postgres/ mysql/ etc..)
Dataset<Row> debeziumDataset = processDataset(kafkaData);
// Some required transformations to ensure debezium data types are converted to spark supported types.
return convertArrayColumnsToString(convertColumnToNullable(sparkSession,
convertDateColumns(debeziumDataset, new Schema.Parser().parse(schemaStr))));
}
/**
* Converts string formatted date columns into Spark date columns.
*
* @param dataset Spark dataset
* @param schema Avro schema from Debezium
* @return Converted dataset
*/
public static Dataset<Row> convertDateColumns(Dataset<Row> dataset, Schema schema) {
if (schema.getField("before") != null) {
List<String> dateFields = schema.getField("before")
.schema()
.getTypes()
.get(1)
.getFields()
.stream()
.filter(field -> {
if (field.schema().getType() == Type.UNION) {
return field.schema().getTypes().stream().anyMatch(
schemaInUnion -> DATE_CONNECT_NAME.equals(schemaInUnion.getProp(CONNECT_NAME_KEY))
);
} else {
return DATE_CONNECT_NAME.equals(field.schema().getProp(CONNECT_NAME_KEY));
}
}).map(Field::name).collect(Collectors.toList());
LOG.info("Date fields: " + dateFields.toString());
for (String dateCol : dateFields) {
dataset = dataset.withColumn(dateCol, functions.col(dateCol).cast(DataTypes.DateType));
}
}
return dataset;
}
/**
* Utility function for converting columns to nullable. This is useful when required to make a column nullable to match a nullable column from Debezium change
* events.
*
* @param sparkSession SparkSession object
* @param dataset Dataframe to modify
* @return Modified dataframe
*/
private static Dataset<Row> convertColumnToNullable(SparkSession sparkSession, Dataset<Row> dataset) {
List<String> columns = Arrays.asList(dataset.columns());
StructField[] modifiedStructFields = Arrays.stream(dataset.schema().fields()).map(field -> columns
.contains(field.name()) ? new StructField(field.name(), field.dataType(), true, field.metadata()) : field)
.toArray(StructField[]::new);
return sparkSession.createDataFrame(dataset.rdd(), new StructType(modifiedStructFields));
}
/**
* Converts Array types to String types because not all Debezium array columns are supported to be converted
* to Spark array columns.
*
* @param dataset Dataframe to modify
* @return Modified dataframe
*/
private static Dataset<Row> convertArrayColumnsToString(Dataset<Row> dataset) {
List<String> arrayColumns = Arrays.stream(dataset.schema().fields())
.filter(field -> field.dataType().typeName().toLowerCase().startsWith("array"))
.map(StructField::name)
.collect(Collectors.toList());
for (String colName : arrayColumns) {
dataset = dataset.withColumn(colName, functions.col(colName).cast(DataTypes.StringType));
}
return dataset;
}
@Override
public void onCommit(String lastCkptStr) {
if (getBooleanWithAltKeys(this.props, KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET)) {
offsetGen.commitOffsetToKafka(lastCkptStr);
}
}
}