blob: 9782ec5fd21653b8568cc50b5ad30e5b8a830774 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command
import java.util.Properties
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.internal.SQLConf
/**
* Command for insert into hoodie table.
*/
case class InsertIntoHoodieTableCommand(
logicalRelation: LogicalRelation,
query: LogicalPlan,
partition: Map[String, Option[String]],
overwrite: Boolean)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
assert(logicalRelation.catalogTable.isDefined, "Missing catalog table")
val table = logicalRelation.catalogTable.get
InsertIntoHoodieTableCommand.run(sparkSession, table, query, partition, overwrite)
Seq.empty[Row]
}
}
object InsertIntoHoodieTableCommand {
/**
* Run the insert query. We support both dynamic partition insert and static partition insert.
* @param sparkSession The spark session.
* @param table The insert table.
* @param query The insert query.
* @param insertPartitions The specified insert partition map.
* e.g. "insert into h(dt = '2021') select id, name from src"
* "dt" is the key in the map and "2021" is the partition value. If the
* partition value has not specified(in the case of dynamic partition)
* , it is None in the map.
* @param overwrite Whether to overwrite the table.
* @param refreshTable Whether to refresh the table after insert finished.
*/
def run(sparkSession: SparkSession, table: CatalogTable, query: LogicalPlan,
insertPartitions: Map[String, Option[String]],
overwrite: Boolean, refreshTable: Boolean = true): Boolean = {
val config = buildHoodieInsertConfig(table, sparkSession, overwrite, insertPartitions)
val mode = if (overwrite && table.partitionColumnNames.isEmpty) {
// insert overwrite non-partition table
SaveMode.Overwrite
} else {
// for insert into or insert overwrite partition we use append mode.
SaveMode.Append
}
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(config)
val conf = sparkSession.sessionState.conf
val alignedQuery = alignOutputFields(query, table, insertPartitions, conf)
// If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery),
// The nullable attribute of fields will lost.
// In order to pass the nullable attribute to the inputDF, we specify the schema
// of the rdd.
val inputDF = sparkSession.createDataFrame(
Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
val success =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, inputDF)._1
if (success) {
if (refreshTable) {
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
}
true
} else {
false
}
}
/**
* Aligned the type and name of query's output fields with the result table's fields.
* @param query The insert query which to aligned.
* @param table The result table.
* @param insertPartitions The insert partition map.
* @param conf The SQLConf.
* @return
*/
private def alignOutputFields(
query: LogicalPlan,
table: CatalogTable,
insertPartitions: Map[String, Option[String]],
conf: SQLConf): LogicalPlan = {
val targetPartitionSchema = table.partitionSchema
val staticPartitionValues = insertPartitions.filter(p => p._2.isDefined).mapValues(_.get)
assert(staticPartitionValues.isEmpty ||
staticPartitionValues.size == targetPartitionSchema.size,
s"Required partition columns is: ${targetPartitionSchema.json}, Current static partitions " +
s"is: ${staticPartitionValues.mkString("," + "")}")
assert(staticPartitionValues.size + query.output.size == table.schema.size,
s"Required select columns count: ${removeMetaFields(table.schema).size}, " +
s"Current select columns(including static partition column) count: " +
s"${staticPartitionValues.size + removeMetaFields(query.output).size},columns: " +
s"(${(removeMetaFields(query.output).map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
val queryDataFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition
query.output.dropRight(targetPartitionSchema.fields.length)
} else { // insert static partition
query.output
}
val targetDataSchema = table.dataSchema
// Align for the data fields of the query
val dataProjects = queryDataFields.zip(targetDataSchema.fields).map {
case (dataAttr, targetField) =>
val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable),
targetField.dataType, conf)
Alias(castAttr, targetField.name)()
}
val partitionProjects = if (staticPartitionValues.isEmpty) { // insert dynamic partitions
// The partition attributes is followed the data attributes in the query
// So we init the partitionAttrPosition with the data schema size.
var partitionAttrPosition = targetDataSchema.size
targetPartitionSchema.fields.map(f => {
val partitionAttr = query.output(partitionAttrPosition)
partitionAttrPosition = partitionAttrPosition + 1
val castAttr = castIfNeeded(partitionAttr.withNullability(f.nullable), f.dataType, conf)
Alias(castAttr, f.name)()
})
} else { // insert static partitions
targetPartitionSchema.fields.map(f => {
val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
s"Missing static partition value for: ${f.name}")
val castAttr = castIfNeeded(Literal.create(staticPartitionValue), f.dataType, conf)
Alias(castAttr, f.name)()
})
}
// Remove the hoodie meta fileds from the projects as we do not need these to write
val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.name))
val alignedProjects = withoutMetaFieldDataProjects ++ partitionProjects
Project(alignedProjects, query)
}
/**
* Build the default config for insert.
* @return
*/
private def buildHoodieInsertConfig(
table: CatalogTable,
sparkSession: SparkSession,
isOverwrite: Boolean,
insertPartitions: Map[String, Option[String]] = Map.empty): Map[String, String] = {
if (insertPartitions.nonEmpty &&
(insertPartitions.keys.toSet != table.partitionColumnNames.toSet)) {
throw new IllegalArgumentException(s"Insert partition fields" +
s"[${insertPartitions.keys.mkString(" " )}]" +
s" not equal to the defined partition in table[${table.partitionColumnNames.mkString(",")}]")
}
val parameters = HoodieOptionConfig.mappingSqlOptionToHoodieParam(table.storage.properties)
val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue)
val partitionFields = table.partitionColumnNames.mkString(",")
val path = getTableLocation(table, sparkSession)
.getOrElse(s"Missing location for table ${table.identifier}")
val tableSchema = table.schema
val options = table.storage.properties
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(options)
val keyGenClass = if (primaryColumns.nonEmpty) {
classOf[SqlKeyGenerator].getCanonicalName
} else {
classOf[UuidKeyGenerator].getName
}
val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key)
.getOrElse(INSERT_DROP_DUPS.defaultValue)
.toBoolean
val operation = if (isOverwrite) {
if (table.partitionColumnNames.nonEmpty) {
INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
} else {
INSERT_OPERATION_OPT_VAL
}
} else {
if (primaryColumns.nonEmpty && !dropDuplicate) {
UPSERT_OPERATION_OPT_VAL
} else {
INSERT_OPERATION_OPT_VAL
}
}
val payloadClassName = if (primaryColumns.nonEmpty && !dropDuplicate &&
tableType == COW_TABLE_TYPE_OPT_VAL) {
// Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload
// on reading.
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
classOf[DefaultHoodieRecordPayload].getCanonicalName
}
val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, options) {
Map(
"path" -> path,
TABLE_TYPE.key -> tableType,
TABLE_NAME.key -> table.identifier.table,
PRECOMBINE_FIELD.key -> tableSchema.fields.last.name,
OPERATION.key -> operation,
KEYGENERATOR_CLASS.key -> keyGenClass,
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
PARTITIONPATH_FIELD.key -> partitionFields,
PAYLOAD_CLASS.key -> payloadClassName,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"),
HIVE_TABLE.key -> table.identifier.table,
HIVE_SUPPORT_TIMESTAMP.key -> "true",
HIVE_STYLE_PARTITIONING.key -> "true",
HIVE_PARTITION_FIELDS.key -> partitionFields,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
URL_ENCODE_PARTITIONING.key -> "true",
HoodieWriteConfig.INSERT_PARALLELISM.key -> "200",
HoodieWriteConfig.UPSERT_PARALLELISM.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> table.partitionSchema.toDDL
)
}
}
}
/**
* Validate the duplicate key for insert statement without enable the INSERT_DROP_DUPS_OPT
* config.
*/
class ValidateDuplicateKeyPayload(record: GenericRecord, orderingVal: Comparable[_])
extends DefaultHoodieRecordPayload(record, orderingVal) {
def this(record: HOption[GenericRecord]) {
this(if (record.isPresent) record.get else null, 0)
}
override def combineAndGetUpdateValue(currentValue: IndexedRecord,
schema: Schema, properties: Properties): HOption[IndexedRecord] = {
val key = currentValue.asInstanceOf[GenericRecord].get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
throw new HoodieDuplicateKeyException(key)
}
}