| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.hudi.command |
| |
| import java.util.Base64 |
| import org.apache.avro.Schema |
| import org.apache.hudi.DataSourceWriteOptions._ |
| import org.apache.hudi.config.HoodieWriteConfig |
| import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME |
| import org.apache.hudi.hive.MultiPartKeysValueExtractor |
| import org.apache.hudi.hive.ddl.HiveSyncMode |
| import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils, SparkAdapterSupport} |
| import org.apache.spark.sql.catalyst.TableIdentifier |
| import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal} |
| import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, InsertAction, MergeIntoTable, SubqueryAlias, UpdateAction} |
| import org.apache.spark.sql.execution.command.RunnableCommand |
| import org.apache.spark.sql.hudi.HoodieSqlUtils._ |
| import org.apache.spark.sql.types.{BooleanType, StructType} |
| import org.apache.spark.sql._ |
| import org.apache.spark.sql.hudi.{HoodieOptionConfig, SerDeUtils} |
| import org.apache.spark.sql.hudi.command.payload.ExpressionPayload |
| import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ |
| |
| /** |
| * The Command for hoodie MergeIntoTable. |
| * The match on condition must contain the row key fields currently, so that we can use Hoodie |
| * Index to speed up the performance. |
| * |
| * The main algorithm: |
| * |
| * We pushed down all the matched and not matched (condition, assignment) expression pairs to the |
| * ExpressionPayload. And the matched (condition, assignment) expression pairs will execute in the |
| * ExpressionPayload#combineAndGetUpdateValue to compute the result record, while the not matched |
| * expression pairs will execute in the ExpressionPayload#getInsertValue. |
| * |
| * For Mor table, it is a litter complex than this. The matched record also goes through the getInsertValue |
| * and write append to the log. So the update actions & insert actions should process by the same |
| * way. We pushed all the update actions & insert actions together to the |
| * ExpressionPayload#getInsertValue. |
| * |
| */ |
| case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends RunnableCommand |
| with SparkAdapterSupport { |
| |
| private var sparkSession: SparkSession = _ |
| |
| /** |
| * The target table identify. |
| */ |
| private lazy val targetTableIdentify: TableIdentifier = getMergeIntoTargetTableId(mergeInto) |
| |
| /** |
| * The target table schema without hoodie meta fields. |
| */ |
| private var sourceDFOutput = mergeInto.sourceTable.output.filter(attr => !isMetaField(attr.name)) |
| |
| /** |
| * The target table schema without hoodie meta fields. |
| */ |
| private lazy val targetTableSchemaWithoutMetaFields = |
| removeMetaFields(mergeInto.targetTable.schema).fields |
| |
| private lazy val targetTable = |
| sparkSession.sessionState.catalog.getTableMetadata(targetTableIdentify) |
| |
| private lazy val targetTableType = |
| HoodieOptionConfig.getTableType(targetTable.storage.properties) |
| |
| /** |
| * |
| * Return a map of target key to the source expression from the Merge-On Condition. |
| * e.g. merge on t.id = s.s_id AND t.name = s.s_name, we return |
| * Map("id" -> "s_id", "name" ->"s_name") |
| * TODO Currently Non-equivalent conditions are not supported. |
| */ |
| private lazy val targetKey2SourceExpression: Map[String, Expression] = { |
| val conditions = splitByAnd(mergeInto.mergeCondition) |
| val allEqs = conditions.forall(p => p.isInstanceOf[EqualTo]) |
| if (!allEqs) { |
| throw new IllegalArgumentException("Non-Equal condition is not support for Merge " + |
| s"Into Statement: ${mergeInto.mergeCondition.sql}") |
| } |
| val targetAttrs = mergeInto.targetTable.output |
| |
| val target2Source = conditions.map(_.asInstanceOf[EqualTo]) |
| .map { |
| case EqualTo(left: AttributeReference, right) |
| if targetAttrs.indexOf(left) >= 0 => // left is the target field |
| left.name -> right |
| case EqualTo(left, right: AttributeReference) |
| if targetAttrs.indexOf(right) >= 0 => // right is the target field |
| right.name -> left |
| case eq => |
| throw new AnalysisException(s"Invalidate Merge-On condition: ${eq.sql}." + |
| "The validate condition should be 'targetColumn = sourceColumnExpression', e.g." + |
| " t.id = s.id and t.dt = from_unixtime(s.ts)") |
| }.toMap |
| target2Source |
| } |
| |
| /** |
| * Get the mapping of target preCombineField to the source expression. |
| */ |
| private lazy val target2SourcePreCombineFiled: Option[(String, Expression)] = { |
| val updateActions = mergeInto.matchedActions.collect { case u: UpdateAction => u } |
| assert(updateActions.size <= 1, s"Only support one updateAction currently, current update action count is: ${updateActions.size}") |
| |
| val updateAction = updateActions.headOption |
| HoodieOptionConfig.getPreCombineField(targetTable.storage.properties).map(preCombineField => { |
| val sourcePreCombineField = |
| updateAction.map(u => u.assignments.filter { |
| case Assignment(key: AttributeReference, _) => key.name.equalsIgnoreCase(preCombineField) |
| case _=> false |
| }.head.value |
| ).getOrElse { |
| // If there is no update action, mapping the target column to the source by order. |
| val target2Source = mergeInto.targetTable.output |
| .filter(attr => !isMetaField(attr.name)) |
| .map(_.name) |
| .zip(mergeInto.sourceTable.output.filter(attr => !isMetaField(attr.name))) |
| .toMap |
| target2Source.getOrElse(preCombineField, null) |
| } |
| (preCombineField, sourcePreCombineField) |
| }).filter(p => p._2 != null) |
| } |
| |
| override def run(sparkSession: SparkSession): Seq[Row] = { |
| this.sparkSession = sparkSession |
| |
| // Create the write parameters |
| val parameters = buildMergeIntoConfig(mergeInto) |
| |
| val sourceDF = buildSourceDF(sparkSession) |
| |
| if (mergeInto.matchedActions.nonEmpty) { // Do the upsert |
| executeUpsert(sourceDF, parameters) |
| } else { // If there is no match actions in the statement, execute insert operation only. |
| executeInsertOnly(sourceDF, parameters) |
| } |
| sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString) |
| Seq.empty[Row] |
| } |
| |
| /** |
| * Build the sourceDF. We will append the source primary key expressions and |
| * preCombine field expression to the sourceDF. |
| * e.g. |
| * <p> |
| * merge into h0 |
| * using (select 1 as id, 'a1' as name, 1000 as ts) s0 |
| * on h0.id = s0.id + 1 |
| * when matched then update set id = s0.id, name = s0.name, ts = s0.ts + 1 |
| * </p> |
| * "ts" is the pre-combine field of h0. |
| * |
| * The targetKey2SourceExpression is: ("id", "s0.id + 1"). |
| * The target2SourcePreCombineFiled is:("ts", "s0.ts + 1"). |
| * We will append the "s0.id + 1 as id" and "s0.ts + 1 as ts" to the sourceDF to compute the |
| * row key and pre-combine field. |
| * |
| */ |
| private def buildSourceDF(sparkSession: SparkSession): DataFrame = { |
| var sourceDF = Dataset.ofRows(sparkSession, mergeInto.sourceTable) |
| targetKey2SourceExpression.foreach { |
| case (targetColumn, sourceExpression) |
| if !isEqualToTarget(targetColumn, sourceExpression) => |
| sourceDF = sourceDF.withColumn(targetColumn, new Column(sourceExpression)) |
| sourceDFOutput = sourceDFOutput :+ AttributeReference(targetColumn, sourceExpression.dataType)() |
| case _=> |
| } |
| target2SourcePreCombineFiled.foreach { |
| case (targetPreCombineField, sourceExpression) |
| if !isEqualToTarget(targetPreCombineField, sourceExpression) => |
| sourceDF = sourceDF.withColumn(targetPreCombineField, new Column(sourceExpression)) |
| sourceDFOutput = sourceDFOutput :+ AttributeReference(targetPreCombineField, sourceExpression.dataType)() |
| case _=> |
| } |
| sourceDF |
| } |
| |
| private def isEqualToTarget(targetColumnName: String, sourceExpression: Expression): Boolean = { |
| sourceExpression match { |
| case attr: AttributeReference if attr.name.equalsIgnoreCase(targetColumnName) => true |
| case Cast(attr: AttributeReference, _, _) if attr.name.equalsIgnoreCase(targetColumnName) => true |
| case _=> false |
| } |
| } |
| |
| /** |
| * Execute the update and delete action. All the matched and not-matched actions will |
| * execute in one upsert write operation. We pushed down the matched condition and assignment |
| * expressions to the ExpressionPayload#combineAndGetUpdateValue and the not matched |
| * expressions to the ExpressionPayload#getInsertValue. |
| */ |
| private def executeUpsert(sourceDF: DataFrame, parameters: Map[String, String]): Unit = { |
| val updateActions = mergeInto.matchedActions.filter(_.isInstanceOf[UpdateAction]) |
| .map(_.asInstanceOf[UpdateAction]) |
| // Check for the update actions |
| checkUpdateAssignments(updateActions) |
| |
| val deleteActions = mergeInto.matchedActions.filter(_.isInstanceOf[DeleteAction]) |
| .map(_.asInstanceOf[DeleteAction]) |
| assert(deleteActions.size <= 1, "Should be only one delete action in the merge into statement.") |
| val deleteAction = deleteActions.headOption |
| |
| val insertActions = |
| mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction]) |
| |
| // Check for the insert actions |
| checkInsertAssignments(insertActions) |
| |
| // Append the table schema to the parameters. In the case of merge into, the schema of sourceDF |
| // may be different from the target table, because the are transform logical in the update or |
| // insert actions. |
| var writeParams = parameters + |
| (OPERATION.key -> UPSERT_OPERATION_OPT_VAL) + |
| (HoodieWriteConfig.WRITE_SCHEMA_PROP.key -> getTableSchema.toString) + |
| (DataSourceWriteOptions.TABLE_TYPE.key -> targetTableType) |
| |
| // Map of Condition -> Assignments |
| val updateConditionToAssignments = |
| updateActions.map(update => { |
| val rewriteCondition = update.condition.map(replaceAttributeInExpression) |
| .getOrElse(Literal.create(true, BooleanType)) |
| val formatAssignments = rewriteAndReOrderAssignments(update.assignments) |
| rewriteCondition -> formatAssignments |
| }).toMap |
| // Serialize the Map[UpdateCondition, UpdateAssignments] to base64 string |
| val serializedUpdateConditionAndExpressions = Base64.getEncoder |
| .encodeToString(SerDeUtils.toBytes(updateConditionToAssignments)) |
| writeParams += (PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS -> |
| serializedUpdateConditionAndExpressions) |
| |
| if (deleteAction.isDefined) { |
| val deleteCondition = deleteAction.get.condition |
| .map(replaceAttributeInExpression) |
| .getOrElse(Literal.create(true, BooleanType)) |
| // Serialize the Map[DeleteCondition, empty] to base64 string |
| val serializedDeleteCondition = Base64.getEncoder |
| .encodeToString(SerDeUtils.toBytes(Map(deleteCondition -> Seq.empty[Assignment]))) |
| writeParams += (PAYLOAD_DELETE_CONDITION -> serializedDeleteCondition) |
| } |
| |
| // Serialize the Map[InsertCondition, InsertAssignments] to base64 string |
| writeParams += (PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS -> |
| serializedInsertConditionAndExpressions(insertActions)) |
| |
| // Remove the meta fields from the sourceDF as we do not need these when writing. |
| val sourceDFWithoutMetaFields = removeMetaFields(sourceDF) |
| HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDFWithoutMetaFields) |
| } |
| |
| /** |
| * If there are not matched actions, we only execute the insert operation. |
| * @param sourceDF |
| * @param parameters |
| */ |
| private def executeInsertOnly(sourceDF: DataFrame, parameters: Map[String, String]): Unit = { |
| val insertActions = mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction]) |
| checkInsertAssignments(insertActions) |
| |
| var writeParams = parameters + |
| (OPERATION.key -> INSERT_OPERATION_OPT_VAL) + |
| (HoodieWriteConfig.WRITE_SCHEMA_PROP.key -> getTableSchema.toString) |
| |
| writeParams += (PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS -> |
| serializedInsertConditionAndExpressions(insertActions)) |
| |
| // Remove the meta fields from the sourceDF as we do not need these when writing. |
| val sourceDFWithoutMetaFields = removeMetaFields(sourceDF) |
| HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDFWithoutMetaFields) |
| } |
| |
| private def checkUpdateAssignments(updateActions: Seq[UpdateAction]): Unit = { |
| updateActions.foreach(update => |
| assert(update.assignments.length == targetTableSchemaWithoutMetaFields.length, |
| s"The number of update assignments[${update.assignments.length}] must equal to the " + |
| s"targetTable field size[${targetTableSchemaWithoutMetaFields.length}]")) |
| // For MOR table, the target table field cannot be the right-value in the update action. |
| if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) { |
| updateActions.foreach(update => { |
| val targetAttrs = update.assignments.flatMap(a => a.value.collect { |
| case attr: AttributeReference if mergeInto.targetTable.outputSet.contains(attr) => attr |
| }) |
| assert(targetAttrs.isEmpty, |
| s"Target table's field(${targetAttrs.map(_.name).mkString(",")}) cannot be the right-value of the update clause for MOR table.") |
| }) |
| } |
| } |
| |
| private def checkInsertAssignments(insertActions: Seq[InsertAction]): Unit = { |
| insertActions.foreach(insert => |
| assert(insert.assignments.length == targetTableSchemaWithoutMetaFields.length, |
| s"The number of insert assignments[${insert.assignments.length}] must equal to the " + |
| s"targetTable field size[${targetTableSchemaWithoutMetaFields.length}]")) |
| |
| } |
| |
| private def getTableSchema: Schema = { |
| val (structName, nameSpace) = AvroConversionUtils |
| .getAvroRecordNameAndNamespace(targetTableIdentify.identifier) |
| AvroConversionUtils.convertStructTypeToAvroSchema( |
| new StructType(targetTableSchemaWithoutMetaFields), structName, nameSpace) |
| } |
| |
| /** |
| * Serialize the Map[InsertCondition, InsertAssignments] to base64 string. |
| * @param insertActions |
| * @return |
| */ |
| private def serializedInsertConditionAndExpressions(insertActions: Seq[InsertAction]): String = { |
| val insertConditionAndAssignments = |
| insertActions.map(insert => { |
| val rewriteCondition = insert.condition.map(replaceAttributeInExpression) |
| .getOrElse(Literal.create(true, BooleanType)) |
| val formatAssignments = rewriteAndReOrderAssignments(insert.assignments) |
| // Do the check for the insert assignments |
| checkInsertExpression(formatAssignments) |
| |
| rewriteCondition -> formatAssignments |
| }).toMap |
| Base64.getEncoder.encodeToString( |
| SerDeUtils.toBytes(insertConditionAndAssignments)) |
| } |
| |
| /** |
| * Rewrite and ReOrder the assignments. |
| * The Rewrite is to replace the AttributeReference to BoundReference. |
| * The ReOrder is to make the assignments's order same with the target table. |
| * @param assignments |
| * @return |
| */ |
| private def rewriteAndReOrderAssignments(assignments: Seq[Expression]): Seq[Expression] = { |
| val attr2Assignment = assignments.map { |
| case Assignment(attr: AttributeReference, value) => { |
| val rewriteValue = replaceAttributeInExpression(value) |
| attr -> Alias(rewriteValue, attr.name)() |
| } |
| case assignment => throw new IllegalArgumentException(s"Illegal Assignment: ${assignment.sql}") |
| }.toMap[Attribute, Expression] |
| // reorder the assignments by the target table field |
| mergeInto.targetTable.output |
| .filterNot(attr => isMetaField(attr.name)) |
| .map(attr => { |
| val assignment = attr2Assignment.getOrElse(attr, |
| throw new IllegalArgumentException(s"Cannot find related assignment for field: ${attr.name}")) |
| castIfNeeded(assignment, attr.dataType, sparkSession.sqlContext.conf) |
| }) |
| } |
| |
| /** |
| * Replace the AttributeReference to BoundReference. This is for the convenience of CodeGen |
| * in ExpressionCodeGen which use the field index to generate the code. So we must replace |
| * the AttributeReference to BoundReference here. |
| * @param exp |
| * @return |
| */ |
| private def replaceAttributeInExpression(exp: Expression): Expression = { |
| val sourceJoinTargetFields = sourceDFOutput ++ |
| mergeInto.targetTable.output.filterNot(attr => isMetaField(attr.name)) |
| |
| exp transform { |
| case attr: AttributeReference => |
| val index = sourceJoinTargetFields.indexWhere(p => p.semanticEquals(attr)) |
| if (index == -1) { |
| throw new IllegalArgumentException(s"cannot find ${attr.qualifiedName} in source or " + |
| s"target at the merge into statement") |
| } |
| BoundReference(index, attr.dataType, attr.nullable) |
| case other => other |
| } |
| } |
| |
| /** |
| * Check the insert action expression. |
| * The insert expression should not contain target table field. |
| */ |
| private def checkInsertExpression(expressions: Seq[Expression]): Unit = { |
| expressions.foreach(exp => { |
| val references = exp.collect { |
| case reference: BoundReference => reference |
| } |
| references.foreach(ref => { |
| if (ref.ordinal >= sourceDFOutput.size) { |
| val targetColumn = targetTableSchemaWithoutMetaFields(ref.ordinal - sourceDFOutput.size) |
| throw new IllegalArgumentException(s"Insert clause cannot contain target table's field: ${targetColumn.name}" + |
| s" in ${exp.sql}") |
| } |
| }) |
| }) |
| } |
| |
| /** |
| * Create the config for hoodie writer. |
| * @param mergeInto |
| * @return |
| */ |
| private def buildMergeIntoConfig(mergeInto: MergeIntoTable): Map[String, String] = { |
| |
| val targetTableDb = targetTableIdentify.database.getOrElse("default") |
| val targetTableName = targetTableIdentify.identifier |
| val path = getTableLocation(targetTable, sparkSession) |
| .getOrElse(s"missing location for $targetTableIdentify") |
| |
| val options = targetTable.storage.properties |
| val definedPk = HoodieOptionConfig.getPrimaryColumns(options) |
| // TODO Currently the mergeEqualConditionKeys must be the same the primary key. |
| if (targetKey2SourceExpression.keySet != definedPk.toSet) { |
| throw new IllegalArgumentException(s"Merge Key[${targetKey2SourceExpression.keySet.mkString(",")}] is not" + |
| s" Equal to the defined primary key[${definedPk.mkString(",")}] in table $targetTableName") |
| } |
| // Enable the hive sync by default if spark have enable the hive metastore. |
| val enableHive = isEnableHive(sparkSession) |
| HoodieWriterUtils.parametersWithWriteDefaults( |
| withSparkConf(sparkSession, options) { |
| Map( |
| "path" -> path, |
| RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","), |
| KEYGENERATOR_CLASS.key -> classOf[SqlKeyGenerator].getCanonicalName, |
| PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field |
| TABLE_NAME.key -> targetTableName, |
| PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","), |
| PAYLOAD_CLASS.key -> classOf[ExpressionPayload].getCanonicalName, |
| META_SYNC_ENABLED.key -> enableHive.toString, |
| HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), |
| HIVE_USE_JDBC.key -> "false", |
| HIVE_DATABASE.key -> targetTableDb, |
| HIVE_TABLE.key -> targetTableName, |
| HIVE_SUPPORT_TIMESTAMP.key -> "true", |
| HIVE_STYLE_PARTITIONING.key -> "true", |
| HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","), |
| HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, |
| URL_ENCODE_PARTITIONING.key -> "true", // enable the url decode for sql. |
| HoodieWriteConfig.INSERT_PARALLELISM.key -> "200", // set the default parallelism to 200 for sql |
| HoodieWriteConfig.UPSERT_PARALLELISM.key -> "200", |
| HoodieWriteConfig.DELETE_PARALLELISM.key -> "200", |
| SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL |
| ) |
| }) |
| } |
| } |