blob: c07024810017ac5a9ad4d3706becf93155a21cc0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.index.SparkHoodieIndexFactory
import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.{SPARK_VERSION, SparkContext}
import scala.collection.JavaConversions._
import scala.collection.mutable
object HoodieSparkSqlWriter {
private val log = LogManager.getLogger(getClass)
private var tableExists: Boolean = false
private var asyncCompactionTriggerFnDefined: Boolean = false
private var asyncClusteringTriggerFnDefined: Boolean = false
def write(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty)
: (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
val path = optParams("path")
val basePath = new Path(path)
val sparkContext = sqlContext.sparkContext
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
originKeyGeneratorClassName, parameters)
//validate datasource and tableconfig keygen are the same
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)),
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined
sparkContext.getConf.getOption("spark.serializer") match {
case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") =>
case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
var operation = WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS is true
// Auto-correct the operation to "insert" if OPERATION is set to "upsert" wrongly
// or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) .
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
operation == WriteOperationType.UPSERT) {
log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
s"when $INSERT_DROP_DUPS is set to be true, " +
s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
operation = WriteOperationType.INSERT
}
val jsc = new JavaSparkContext(sparkContext)
if (asyncCompactionTriggerFn.isDefined) {
if (jsc.getConf.getOption(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY).isDefined) {
jsc.setLocalProperty("spark.scheduler.pool", SparkConfigs.SPARK_DATASOURCE_WRITER_POOL_NAME)
}
}
val instantTime = HoodieActiveTimeline.createNewInstantTime()
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieConfig.getProps))
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
(false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
// Create the table if not present
if (!tableExists) {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);
val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setDatabaseName(databaseName)
.setTableName(tblName)
.setRecordKeyFields(recordKeyFields)
.setBaseFileFormat(baseFileFormat)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME))
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
// but we are interested in what user has set, hence fetching from optParams.
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setKeyGeneratorClassProp(originKeyGeneratorClassName)
.set(timestampKeyGeneratorConfigs)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
}
val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)
val dropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
// short-circuit if bulk_insert via row is enabled.
// scalastyle:off
if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) &&
operation == WriteOperationType.BULK_INSERT) {
val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
basePath, path, instantTime, partitionColumns)
return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
}
// scalastyle:on
val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
val (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) =
operation match {
case WriteOperationType.DELETE => {
val genericRecords = registerKryoClassesAndGetGenericRecords(tblName, sparkContext, df, reconcileSchema)
// Convert to RDD[HoodieKey]
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
if (!tableExists) {
throw new HoodieException(s"hoodie table at $basePath does not exist")
}
// Create a HoodieWriteClient & issue the delete.
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
if (isAsyncClusteringEnabled(client, parameters)) {
asyncClusteringTriggerFn.get.apply(client)
}
// Issue deletes
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
(writeStatuses, client)
}
case WriteOperationType.DELETE_PARTITION => {
if (!tableExists) {
throw new HoodieException(s"hoodie table at $basePath does not exist")
}
// Get list of partitions to delete
val partitionsToDelete = if (parameters.containsKey(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) {
val partitionColsToDelete = parameters(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).split(",")
java.util.Arrays.asList(partitionColsToDelete: _*)
} else {
val genericRecords = registerKryoClassesAndGetGenericRecords(tblName, sparkContext, df, reconcileSchema)
genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
}
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
// Issue delete partitions
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime)
(writeStatuses, client)
}
case _ => { // any other operation
// register classes & schemas
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
// TODO(HUDI-4472) revisit and simplify schema handling
val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
val latestTableSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(sourceSchema)
val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
val writerSchema: Schema =
if (reconcileSchema) {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema))
}
if (internalSchemaOpt.isDefined) {
// Apply schema evolution, by auto-merging write schema and read schema
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getName)
} else if (TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
// In case schema reconciliation is enabled and source and latest table schemas
// are compatible (as defined by [[TableSchemaResolver#isSchemaCompatible]], then we will
// pick latest table's schema as the writer's schema
latestTableSchema
} else {
// Otherwise fallback to original source's schema
sourceSchema
}
} else {
// In case reconciliation is disabled, we still have to do nullability attributes
// (minor) reconciliation, making sure schema of the incoming batch is in-line with
// the data already committed in the table
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, latestTableSchema)
}
validateSchemaForHoodieIsDeleted(writerSchema)
sparkContext.getConf.registerAvroSchemas(writerSchema)
log.info(s"Registered avro schema : ${writerSchema.toString(true)}")
// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
org.apache.hudi.common.util.Option.of(writerSchema))
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
val hoodieRecord = if (shouldCombine) {
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(processedRecord,
orderingVal,
keyGenerator.getKey(gr),
hoodieConfig.getString(PAYLOAD_CLASS_NAME))
} else {
DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
}
hoodieRecord
}).toJavaRDD()
val writerDataSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema
// Create a HoodieWriteClient & issue the write.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path,
tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
if (isAsyncClusteringEnabled(client, parameters)) {
asyncClusteringTriggerFn.get.apply(client)
}
val hoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
} else {
hoodieAllIncomingRecords
}
client.startCommitWithTime(instantTime, commitActionType)
val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
(writeResult, client)
}
}
// Check for errors and commit the write.
val (writeSuccessful, compactionInstant, clusteringInstant) =
commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
writeResult, parameters, writeClient, tableConfig, jsc,
TableInstantInfo(basePath, instantTime, commitActionType, operation))
(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig)
}
}
def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = {
val fieldsToRemove = new java.util.ArrayList[String]()
partitionParam.split(",").map(partitionField => partitionField.trim)
.filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field))
HoodieAvroUtils.removeFields(schema, fieldsToRemove)
}
def getProcessedRecord(partitionParam: String, record: GenericRecord,
dropPartitionColumns: Boolean): GenericRecord = {
var processedRecord = record
if (dropPartitionColumns) {
val writeSchema = generateSchemaWithoutPartitionColumns(partitionParam, record.getSchema)
processedRecord = HoodieAvroUtils.rewriteRecord(record, writeSchema)
}
processedRecord
}
def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = {
val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false"
parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)),
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable)
}
/**
* get latest internalSchema from table
*
* @param fs instance of FileSystem.
* @param basePath base path.
* @param sparkContext instance of spark context.
* @param schema incoming record's schema.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = {
try {
if (FSUtils.isTableExists(basePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata
if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None
} else {
None
}
} catch {
case _: Exception => None
}
}
/**
* Checks if schema needs upgrade (if incoming record's write schema is old while table schema got evolved).
*
* @param fs instance of FileSystem.
* @param basePath base path.
* @param sparkContext instance of spark context.
* @param schema incoming record's schema.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[Schema] = {
if (FSUtils.isTableExists(basePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder
.setConf(sparkContext.hadoopConfiguration)
.setBasePath(basePath.toString)
.build()
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
} else {
None
}
}
def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext: SparkContext, df: Dataset[Row],
reconcileSchema: Boolean): RDD[GenericRecord] = {
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema)
}
def bootstrap(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = {
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
val path = optParams("path")
val basePath = new Path(path)
val sparkContext = sqlContext.sparkContext
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
s"'${BASE_PATH.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
" operation'")
val bootstrapIndexClass = hoodieConfig.getStringOrDefault(INDEX_CLASS_NAME)
var schema: String = null
if (df.schema.nonEmpty) {
val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, namespace).toString
} else {
schema = HoodieAvroUtils.getNullSchema.toString
}
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
if (!hoodieWriteClient.isEmpty) {
hoodieWriteClient.get.close()
}
false
} else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs)
if (!tableExists) {
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(
HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())
))
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val useBaseFormatMetaFile = java.lang.Boolean.parseBoolean(parameters.getOrElse(
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())
))
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName)
.setRecordKeyFields(recordKeyFields)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
.setBootstrapIndexClass(bootstrapIndexClass)
.setBaseFileFormat(baseFileFormat)
.setBootstrapBasePath(bootstrapBasePath)
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(keyGenProp)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.initTable(sparkContext.hadoopConfiguration, path)
}
val jsc = new JavaSparkContext(sqlContext.sparkContext)
val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schema, path, tableName, mapAsJavaMap(parameters)))
try {
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
} finally {
writeClient.close()
}
val metaSyncSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
metaSyncSuccess
}
}
def validateSchemaForHoodieIsDeleted(schema: Schema): Unit = {
if (schema.getField(HoodieRecord.HOODIE_IS_DELETED) != null &&
AvroConversionUtils.resolveAvroTypeNullability(schema.getField(HoodieRecord.HOODIE_IS_DELETED).schema())._2.getType != Schema.Type.BOOLEAN) {
throw new HoodieException(HoodieRecord.HOODIE_IS_DELETED + " has to be BOOLEAN type. Passed in dataframe's schema has type "
+ schema.getField(HoodieRecord.HOODIE_IS_DELETED).schema().getType)
}
}
def bulkInsertAsRow(sqlContext: SQLContext,
parameters: Map[String, String],
df: DataFrame,
tblName: String,
basePath: Path,
path: String,
instantTime: String,
partitionColumns: String): (Boolean, common.util.Option[String]) = {
val sparkContext = sqlContext.sparkContext
val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))
val dropPartitionColumns = parameters.get(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key()).map(_.toBoolean)
.getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue())
// register classes & schemas
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
if (dropPartitionColumns) {
schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)
}
validateSchemaForHoodieIsDeleted(schema)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
}
val params: mutable.Map[String, String] = collection.mutable.Map(parameters.toSeq: _*)
params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
if (userDefinedBulkInsertPartitionerOpt.isPresent) {
userDefinedBulkInsertPartitionerOpt.get
}
else {
BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig.getBulkInsertSortMode)
}
} else {
// Sort modes are not yet supported when meta fields are disabled
new NonSortPartitionerWithRows()
}
val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted()
params(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED) = arePartitionRecordsSorted.toString
val isGlobalIndex = if (populateMetaFields) {
SparkHoodieIndexFactory.isGlobalIndex(writeConfig)
} else {
false
}
val hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(df, writeConfig, bulkInsertPartitionerRows, dropPartitionColumns)
if (HoodieSparkUtils.isSpark2) {
hoodieDF.write.format("org.apache.hudi.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.options(params)
.mode(SaveMode.Append)
.save()
} else if (HoodieSparkUtils.isSpark3) {
hoodieDF.write.format("org.apache.hudi.spark3.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)
.options(params)
.mode(SaveMode.Append)
.save()
} else {
throw new HoodieException("Bulk insert using row writer is not supported with current Spark version."
+ " To use row writer please switch to spark 2 or spark 3")
}
val syncHiveSuccess = metaSync(sqlContext.sparkSession, writeConfig, basePath, df.schema)
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}
private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
operation: WriteOperationType, fs: FileSystem): Unit = {
if (mode == SaveMode.Append && tableExists) {
val existingTableName = tableConfig.getTableName
val resolver = spark.sessionState.conf.resolver
if (!resolver(existingTableName, tableName)) {
throw new HoodieException(s"hoodie table with name $existingTableName already exists at $tablePath," +
s" can not append data to the table with another name $tableName.")
}
}
if (operation != WriteOperationType.DELETE) {
if (mode == SaveMode.ErrorIfExists && tableExists) {
throw new HoodieException(s"hoodie table at $tablePath already exists.")
} else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) {
// When user set operation as INSERT_OVERWRITE_TABLE,
// overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation
log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.")
fs.delete(tablePath, true)
tableExists = false
}
} else {
// Delete Operation only supports Append mode
if (mode != SaveMode.Append) {
throw new HoodieException(s"Append is the only save mode applicable for ${operation.toString} operation")
}
}
}
private def metaSync(spark: SparkSession, hoodieConfig: HoodieConfig, basePath: Path,
schema: StructType): Boolean = {
val hiveSyncEnabled = hoodieConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_ENABLED).toBoolean
var metaSyncEnabled = hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_ENABLED).toBoolean
var syncClientToolClassSet = scala.collection.mutable.Set[String]()
hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS_NAME).split(",").foreach(syncClass => syncClientToolClassSet += syncClass)
// for backward compatibility
if (hiveSyncEnabled) {
metaSyncEnabled = true
syncClientToolClassSet += classOf[HiveSyncTool].getName
}
if (metaSyncEnabled) {
val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT);
val properties = new TypedProperties()
properties.putAll(hoodieConfig.getProps)
properties.put(HiveSyncConfigHolder.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key, spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD).toString)
properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key, SPARK_VERSION)
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key, hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
syncClientToolClassSet.foreach(impl => {
SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf, fs, basePath.toString, baseFileFormat)
})
}
// Since Hive tables are now synced as Spark data source tables which are cached after Spark SQL queries
// we must invalidate this table in the cache so writes are reflected in later queries
if (metaSyncEnabled) {
getHiveTableNames(hoodieConfig).foreach(name => {
val qualifiedTableName = String.join(".", hoodieConfig.getStringOrDefault(HIVE_DATABASE), name)
if (spark.catalog.tableExists(qualifiedTableName)) {
spark.catalog.refreshTable(qualifiedTableName)
}
})
}
true
}
private def getHiveTableNames(hoodieConfig: HoodieConfig): List[String] = {
val tableName = hoodieConfig.getStringOrDefault(HIVE_TABLE)
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
if (tableType.equals(COW_TABLE_TYPE_OPT_VAL)) {
List(tableName)
} else {
val roSuffix = if (hoodieConfig.getBooleanOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE)) {
""
} else {
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE
}
List(tableName + roSuffix,
tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE)
}
}
/**
* Group all table/action specific information into a case class.
*/
case class TableInstantInfo(basePath: Path, instantTime: String, commitActionType: String, operation: WriteOperationType)
private def commitAndPerformPostOperations(spark: SparkSession,
schema: StructType,
writeResult: HoodieWriteResult,
parameters: Map[String, String],
client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
tableConfig: HoodieTableConfig,
jsc: JavaSparkContext,
tableInstantInfo: TableInstantInfo
): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 0) {
log.info("Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX.key)))
val commitSuccess =
client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses,
common.util.Option.of(new java.util.HashMap[String, String](mapAsJavaMap(metaMap))),
tableInstantInfo.commitActionType,
writeResult.getPartitionToReplaceFileIds)
if (commitSuccess) {
log.info("Commit " + tableInstantInfo.instantTime + " successful!")
}
else {
log.info("Commit " + tableInstantInfo.instantTime + " failed!")
}
val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
val compactionInstant: common.util.Option[java.lang.String] =
if (asyncCompactionEnabled) {
client.scheduleCompaction(common.util.Option.of(new java.util.HashMap[String, String](mapAsJavaMap(metaMap))))
} else {
common.util.Option.empty()
}
log.info(s"Compaction Scheduled is $compactionInstant")
val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters)
val clusteringInstant: common.util.Option[java.lang.String] =
if (asyncClusteringEnabled) {
client.scheduleClustering(common.util.Option.of(new java.util.HashMap[String, String](mapAsJavaMap(metaMap))))
} else {
common.util.Option.empty()
}
log.info(s"Clustering Scheduled is $clusteringInstant")
val metaSyncSuccess = metaSync(spark, HoodieWriterUtils.convertMapToHoodieConfig(parameters),
tableInstantInfo.basePath, schema)
log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
client.close()
}
(commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
} else {
log.error(s"${tableInstantInfo.operation} failed with errors")
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)
.take(100)
.foreach(ws => {
log.trace("Global error :", ws.getGlobalError)
if (ws.getErrors.size() > 0) {
ws.getErrors.foreach(kt =>
log.trace(s"Error for key: ${kt._1}", kt._2))
}
})
}
(false, common.util.Option.empty(), common.util.Option.empty())
}
}
private def isAsyncCompactionEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
tableConfig: HoodieTableConfig,
parameters: Map[String, String], configuration: Configuration): Boolean = {
log.info(s"Config.inlineCompactionEnabled ? ${client.getConfig.inlineCompactionEnabled}")
if (asyncCompactionTriggerFnDefined && !client.getConfig.inlineCompactionEnabled
&& parameters.get(ASYNC_COMPACT_ENABLE.key).exists(r => r.toBoolean)) {
tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
} else {
false
}
}
private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
parameters: Map[String, String]): Boolean = {
log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}")
asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled
}
private def getHoodieTableConfig(sparkContext: SparkContext,
tablePath: String,
hoodieTableConfigOpt: Option[HoodieTableConfig]): HoodieTableConfig = {
if (tableExists) {
hoodieTableConfigOpt.getOrElse(
HoodieTableMetaClient.builder().setConf(sparkContext.hadoopConfiguration).setBasePath(tablePath)
.build().getTableConfig)
} else {
null
}
}
private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = {
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams)
val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
&& mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key)
}
if (null != tableConfig && mode != SaveMode.Overwrite) {
tableConfig.getProps.foreach { case (key, value) =>
mergedParams(key) = value
}
}
// use preCombineField to fill in PAYLOAD_ORDERING_FIELD_PROP_KEY
if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
mergedParams.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, mergedParams(PRECOMBINE_FIELD.key()))
}
val params = mergedParams.toMap
(params, HoodieWriterUtils.convertMapToHoodieConfig(params))
}
private def extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenerator: String,
params: Map[String, String]): Map[String, String] = {
if (classOf[TimestampBasedKeyGenerator].getCanonicalName.equals(keyGenerator) ||
classOf[TimestampBasedAvroKeyGenerator].getCanonicalName.equals(keyGenerator)) {
params.filterKeys(HoodieTableConfig.PERSISTED_CONFIG_LIST.contains)
} else {
Map.empty
}
}
}