[HUDI-6658] Inject filters for incremental query (#10225)
Add incremental filters to the query plan
Also fix some tests that use partition path as the precombine field
Incremental queries will now work the new filegroup reader
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index 64ee645..b9110f1 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -102,9 +102,11 @@
* Spark requires file formats to append the partition path fields to the end of the schema.
* For tables where the partition path fields are not at the end of the schema, we don't want
* to return the schema in the wrong order when they do a query like "select *". To fix this
- * behavior, we apply a projection onto FileScan when the file format is NewHudiParquetFileFormat
+ * behavior, we apply a projection onto FileScan when the file format has HoodieFormatTrait
+ *
+ * Additionally, incremental queries require filters to be added to the plan
*/
- def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan
+ def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan
/**
* Decomposes [[InsertIntoStatement]] into its arguments allowing to accommodate for API
@@ -140,4 +142,9 @@
def failAnalysisForMIT(a: Attribute, cols: String): Unit = {}
def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression], hint: String): LogicalPlan
+
+ /**
+ * true if both plans produce the same attributes in the the same order
+ */
+ def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 168502b..ac8286b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -285,7 +285,12 @@
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
}
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
- new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
+ if (fileFormatUtils.isDefined) {
+ new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
+ } else {
+ new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
+ }
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
if (fileFormatUtils.isDefined) {
@@ -304,7 +309,12 @@
}
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
- MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema)
+ if (fileFormatUtils.isDefined) {
+ new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
+ } else {
+ MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema)
+ }
case (_, _, true) =>
if (fileFormatUtils.isDefined) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
index 37e42bc..959f7d2 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
@@ -28,6 +28,7 @@
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
+import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import scala.jdk.CollectionConverters.{asScalaBufferConverter, mapAsScalaMapConverter}
@@ -78,4 +79,8 @@
new Path(fileGroupId.getPartitionPath, fileGroupId.getFileId).toString
}.toArray
}
+
+ override def getRequiredFilters: Seq[Filter] = {
+ Seq.empty
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index 88f7224..aee1518 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -281,7 +281,7 @@
sparkSession, metaClient, schemaSpec, options, FileStatusCache.getOrCreate(sparkSession), true, true)
override def buildFileFormat(): FileFormat = {
- if (fileGroupReaderEnabled && !isBootstrap) {
+ if (fileGroupReaderEnabled) {
new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
@@ -318,7 +318,7 @@
shouldEmbedFileSlices = true)
override def buildFileFormat(): FileFormat = {
- if (fileGroupReaderEnabled && !isBootstrap) {
+ if (fileGroupReaderEnabled) {
new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
@@ -349,7 +349,7 @@
sparkSession, metaClient, schemaSpec, options, FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)
override def buildFileFormat(): FileFormat = {
- if (fileGroupReaderEnabled && !isBootstrap) {
+ if (fileGroupReaderEnabled) {
new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
index 7e765b4..d541e73 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
@@ -29,7 +29,7 @@
import org.apache.spark.sql.types.StructType
import java.util.stream.Collectors
-import scala.jdk.CollectionConverters.asScalaBufferConverter
+import scala.collection.JavaConverters._
class HoodieIncrementalFileIndex(override val spark: SparkSession,
override val metaClient: HoodieTableMetaClient,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
new file mode 100644
index 0000000..0587f13
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.{HoodieCDCFileIndex, SparkAdapterSupport, SparkHoodieTableFileIndex}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Contains, EndsWith, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, NamedExpression, Not, Or, StartsWith}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, ParquetFileFormat}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+import scala.util.Try
+
+object FileFormatUtilsForFileGroupReader extends SparkAdapterSupport {
+
+ def applyNewFileFormatChanges(scanOperation: LogicalPlan, logicalRelation: LogicalPlan, fs: HadoopFsRelation): LogicalPlan = {
+ val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+ ff.isProjected = true
+ val tableSchema = fs.location match {
+ case index: HoodieCDCFileIndex => index.cdcRelation.schema
+ case index: SparkHoodieTableFileIndex => index.schema
+ }
+ val resolvedSchema = logicalRelation.resolve(tableSchema, fs.sparkSession.sessionState.analyzer.resolver)
+ val unfilteredPlan = if (!fs.partitionSchema.fields.isEmpty && sparkAdapter.getCatalystPlanUtils.produceSameOutput(scanOperation, logicalRelation)) {
+ Project(resolvedSchema, scanOperation)
+ } else {
+ scanOperation
+ }
+ applyFiltersToPlan(unfilteredPlan, tableSchema, resolvedSchema, ff.getRequiredFilters)
+ }
+
+ /**
+ * adapted from https://github.com/apache/spark/blob/20df062d85e80422a55afae80ddbf2060f26516c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/StructFilters.scala
+ */
+ def applyFiltersToPlan(plan: LogicalPlan, tableSchema: StructType, resolvedSchema: Seq[Attribute], filters: Seq[org.apache.spark.sql.sources.Filter]): LogicalPlan = {
+
+ def filterToExpression(
+ filter: sources.Filter,
+ toRef: String => Option[NamedExpression]): Option[Expression] = {
+ def zipAttributeAndValue(name: String, value: Any): Option[(NamedExpression, Literal)] = {
+ zip(toRef(name), toLiteral(value))
+ }
+
+ def translate(filter: sources.Filter): Option[Expression] = filter match {
+ case sources.And(left, right) =>
+ zip(translate(left), translate(right)).map(And.tupled)
+ case sources.Or(left, right) =>
+ zip(translate(left), translate(right)).map(Or.tupled)
+ case sources.Not(child) =>
+ translate(child).map(Not)
+ case sources.EqualTo(attribute, value) =>
+ zipAttributeAndValue(attribute, value).map(EqualTo.tupled)
+ case sources.EqualNullSafe(attribute, value) =>
+ zipAttributeAndValue(attribute, value).map(EqualNullSafe.tupled)
+ case sources.IsNull(attribute) =>
+ toRef(attribute).map(IsNull)
+ case sources.IsNotNull(attribute) =>
+ toRef(attribute).map(IsNotNull)
+ case sources.In(attribute, values) =>
+ val literals = values.toSeq.flatMap(toLiteral)
+ if (literals.length == values.length) {
+ toRef(attribute).map(In(_, literals))
+ } else {
+ None
+ }
+ case sources.GreaterThan(attribute, value) =>
+ zipAttributeAndValue(attribute, value).map(GreaterThan.tupled)
+ case sources.GreaterThanOrEqual(attribute, value) =>
+ zipAttributeAndValue(attribute, value).map(GreaterThanOrEqual.tupled)
+ case sources.LessThan(attribute, value) =>
+ zipAttributeAndValue(attribute, value).map(LessThan.tupled)
+ case sources.LessThanOrEqual(attribute, value) =>
+ zipAttributeAndValue(attribute, value).map(LessThanOrEqual.tupled)
+ case sources.StringContains(attribute, value) =>
+ zipAttributeAndValue(attribute, value).map(Contains.tupled)
+ case sources.StringStartsWith(attribute, value) =>
+ zipAttributeAndValue(attribute, value).map(StartsWith.tupled)
+ case sources.StringEndsWith(attribute, value) =>
+ zipAttributeAndValue(attribute, value).map(EndsWith.tupled)
+ /* Not supported in spark2. If needed, we will need to create separate spark 2 and 3 implementations
+ case sources.AlwaysTrue() =>
+ Some(Literal(true, BooleanType))
+ case sources.AlwaysFalse() =>
+ Some(Literal(false, BooleanType))
+ */
+ }
+
+ translate(filter)
+ }
+
+ def zip[A, B](a: Option[A], b: Option[B]): Option[(A, B)] = {
+ a.zip(b).headOption
+ }
+
+ def toLiteral(value: Any): Option[Literal] = {
+ Try(Literal(value)).toOption
+ }
+
+ def toRef(attr: String): Option[NamedExpression] = {
+ tableSchema.getFieldIndex(attr).map { index =>
+ resolvedSchema(index)
+ }
+ }
+
+ val expressionFilters = filters.map(f => filterToExpression(f, n => toRef(n)).get)
+ if (expressionFilters.nonEmpty) {
+ Filter(expressionFilters.reduceLeft(And), plan)
+ } else {
+ plan
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 3565c70..57fdaf8 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -47,6 +47,7 @@
// Used so that the planner only projects once and does not stack overflow
var isProjected: Boolean = false
+ def getRequiredFilters: Seq[Filter]
}
/**
@@ -65,6 +66,8 @@
requiredFilters: Seq[Filter]
) extends ParquetFileFormat with SparkAdapterSupport with HoodieFormatTrait {
+ def getRequiredFilters: Seq[Filter] = requiredFilters
+
/**
* Support batch needs to remain consistent, even if one side of a bootstrap merge can support
* while the other side can't
@@ -104,8 +107,8 @@
val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
val (baseFileReader, preMergeBaseFileReader, readerMaps, cdcFileReader) = buildFileReaders(
- spark, dataSchema, partitionSchema, if (isIncremental) requiredSchemaWithMandatory else requiredSchema,
- filters, options, augmentedHadoopConf, requiredSchemaWithMandatory, requiredWithoutMeta, requiredMeta)
+ spark, dataSchema, partitionSchema, requiredSchema, filters, options, augmentedHadoopConf,
+ requiredSchemaWithMandatory, requiredWithoutMeta, requiredMeta)
val requestedAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, sanitizedTableName)
val dataAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, sanitizedTableName)
@@ -242,25 +245,20 @@
metaFields.find(f => f.name == name)
}
- // If not MergeOnRead or if projection is compatible
- if (isIncremental) {
- StructType(dataSchema.toArray ++ partitionSchema.fields)
- } else {
- val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
- for (field <- mandatoryFields) {
- if (requiredSchema.getFieldIndex(field).isEmpty) {
- // Support for nested fields
- val fieldParts = field.split("\\.")
- val fieldToAdd = findNestedField(dataSchema, fieldParts)
- .orElse(findNestedField(partitionSchema, fieldParts))
- .orElse(findMetaField(field))
- .getOrElse(throw new IllegalArgumentException(s"Field $field does not exist in the table schema"))
- added.append(fieldToAdd)
- }
+ val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
+ for (field <- mandatoryFields) {
+ if (requiredSchema.getFieldIndex(field).isEmpty) {
+ // Support for nested fields
+ val fieldParts = field.split("\\.")
+ val fieldToAdd = findNestedField(dataSchema, fieldParts)
+ .orElse(findNestedField(partitionSchema, fieldParts))
+ .orElse(findMetaField(field))
+ .getOrElse(throw new IllegalArgumentException(s"Field $field does not exist in the table schema"))
+ added.append(fieldToAdd)
}
- val addedFields = StructType(added.toArray)
- StructType(requiredSchema.toArray ++ addedFields.fields)
}
+ val addedFields = StructType(added.toArray)
+ StructType(requiredSchema.toArray ++ addedFields.fields)
}
protected def buildFileReaders(sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index 44381a5..0751d99 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -56,6 +56,8 @@
requiredFilters: Seq[Filter]
) extends ParquetFileFormat with SparkAdapterSupport with HoodieFormatTrait {
+ def getRequiredFilters: Seq[Filter] = requiredFilters
+
override def isSplitable(sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 9d40140..eb69f2b 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -281,7 +281,7 @@
ut.copy(table = relation)
case logicalPlan: LogicalPlan if logicalPlan.resolved =>
- sparkAdapter.getCatalystPlanUtils.applyNewHoodieParquetFileFormatProjection(logicalPlan)
+ sparkAdapter.getCatalystPlanUtils.maybeApplyForNewFileFormat(logicalPlan)
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
index b0454f7..a7ef56d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
@@ -31,6 +31,7 @@
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.params.ParameterizedTest;
@@ -59,6 +60,11 @@
public class TestGlobalIndexEnableUpdatePartitions extends SparkClientFunctionalTestHarness {
+ @Override
+ public SparkConf conf() {
+ return conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
+ }
+
private static Stream<Arguments> getTableTypeAndIndexType() {
return Stream.of(
Arguments.of(COPY_ON_WRITE, GLOBAL_SIMPLE),
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
index 75a7026..efcc935 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
@@ -127,12 +127,12 @@
runIncrementalQueryAndCompare(startArchivedCommitTs, endArchivedCommitTs, 1, true)
// Test start commit is archived, end commit is not archived
- shouldThrowIfFallbackIsFalse(tableType,
+ shouldThrowIfFallbackIsFalse(
() => runIncrementalQueryAndCompare(startArchivedCommitTs, endUnarchivedCommitTs, nArchivedInstants + 1, false))
runIncrementalQueryAndCompare(startArchivedCommitTs, endUnarchivedCommitTs, nArchivedInstants + 1, true)
// Test both start commit and end commits are not archived but got cleaned
- shouldThrowIfFallbackIsFalse(tableType,
+ shouldThrowIfFallbackIsFalse(
() => runIncrementalQueryAndCompare(startUnarchivedCommitTs, endUnarchivedCommitTs, 1, false))
runIncrementalQueryAndCompare(startUnarchivedCommitTs, endUnarchivedCommitTs, 1, true)
@@ -169,22 +169,13 @@
assertEquals(perBatchSize * batchNum, hoodieIncViewDF.count())
}
- private def shouldThrowIfFallbackIsFalse(tableType: HoodieTableType, fn: () => Unit): Unit = {
+ private def shouldThrowIfFallbackIsFalse(fn: () => Unit): Unit = {
val msg = "Should fail with Path does not exist"
- tableType match {
- case HoodieTableType.COPY_ON_WRITE =>
- assertThrows(classOf[HoodieIncrementalPathNotFoundException], new Executable {
- override def execute(): Unit = {
- fn()
- }
- }, msg)
- case HoodieTableType.MERGE_ON_READ =>
- val exp = assertThrows(classOf[SparkException], new Executable {
- override def execute(): Unit = {
- fn()
- }
- }, msg)
- assertTrue(exp.getMessage.contains("FileNotFoundException"))
- }
+ val exp = assertThrows(classOf[SparkException], new Executable {
+ override def execute(): Unit = {
+ fn()
+ }
+ }, msg)
+ assertTrue(exp.getMessage.contains("FileNotFoundException"))
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 3f64e24..682a118 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -27,7 +27,9 @@
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
@@ -37,6 +39,8 @@
class TestSparkDataSource extends SparkClientFunctionalTestHarness {
+ override def conf: SparkConf = conf(getSparkSqlConf)
+
val parallelism: Integer = 4
val commonOpts: Map[String, String] = Map(
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index b2c7333..ac0afd6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -50,14 +50,15 @@
| id int,
| name string,
| price double,
- | ts long
+ | ts long,
+ | partition long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
- | partitioned by(ts)
+ | partitioned by(partition)
| location '$basePath'
""".stripMargin)
// disable automatic inline compaction so that HoodieDataSourceHelpers.allCompletedCommitsCompactions
@@ -65,16 +66,16 @@
spark.sql("set hoodie.compact.inline=false")
spark.sql("set hoodie.compact.schedule.inline=false")
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1002)")
val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, Map.empty, Option(tableName))
// Generate the first clustering plan
val firstScheduleInstant = client.createNewInstantTime()
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
// Generate the second clustering plan
- spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003, 1003)")
val secondScheduleInstant = client.createNewInstantTime()
client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty())
checkAnswer(s"call show_clustering('$tableName')")(
@@ -85,9 +86,9 @@
// Do clustering for all clustering plan generated above, and no new clustering
// instant will be generated because of there is no commit after the second
// clustering plan generated
- checkAnswer(s"call run_clustering(table => '$tableName', order => 'ts', show_involved_partition => true)")(
- Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), "ts=1003"),
- Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001,ts=1002")
+ checkAnswer(s"call run_clustering(table => '$tableName', order => 'partition', show_involved_partition => true)")(
+ Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), "partition=1003"),
+ Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "partition=1000,partition=1001,partition=1002")
)
// No new commits
@@ -102,11 +103,11 @@
.toSeq
assertResult(2)(finishedClustering.size)
- checkAnswer(s"select id, name, price, ts from $tableName order by id")(
- Seq(1, "a1", 10.0, 1000),
- Seq(2, "a2", 10.0, 1001),
- Seq(3, "a3", 10.0, 1002),
- Seq(4, "a4", 10.0, 1003)
+ checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1000, 1000),
+ Seq(2, "a2", 10.0, 1001, 1001),
+ Seq(3, "a3", 10.0, 1002, 1002),
+ Seq(4, "a4", 10.0, 1003, 1003)
)
// After clustering there should be no pending clustering and all clustering instants should be completed
@@ -116,9 +117,9 @@
)
// Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
- spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
- spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
- spark.sql(s"call run_clustering(table => '$tableName', order => 'ts', show_involved_partition => true)").show()
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004, 1004)")
+ spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005, 1005)")
+ spark.sql(s"call run_clustering(table => '$tableName', order => 'partition', show_involved_partition => true)").show()
val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
.findInstantsAfter(secondScheduleInstant)
@@ -129,13 +130,13 @@
// Should have a new replace commit after the second clustering command.
assertResult(1)(thirdClusteringInstant.size)
- checkAnswer(s"select id, name, price, ts from $tableName order by id")(
- Seq(1, "a1", 10.0, 1000),
- Seq(2, "a2", 10.0, 1001),
- Seq(3, "a3", 10.0, 1002),
- Seq(4, "a4", 10.0, 1003),
- Seq(5, "a5", 10.0, 1004),
- Seq(6, "a6", 10.0, 1005)
+ checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1000, 1000),
+ Seq(2, "a2", 10.0, 1001, 1001),
+ Seq(3, "a3", 10.0, 1002, 1002),
+ Seq(4, "a4", 10.0, 1003, 1003),
+ Seq(5, "a5", 10.0, 1004, 1004),
+ Seq(6, "a6", 10.0, 1005, 1005)
)
}
}
@@ -152,39 +153,40 @@
| id int,
| name string,
| price double,
- | ts long
+ | ts long,
+ | partition long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
- | partitioned by(ts)
+ | partitioned by(partition)
| location '$basePath'
""".stripMargin)
spark.sql(s"call run_clustering(path => '$basePath')").show()
checkAnswer(s"call show_clustering(path => '$basePath')")()
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1002)")
val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, Map.empty, Option(tableName))
// Generate the first clustering plan
val firstScheduleInstant = client.createNewInstantTime()
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
checkAnswer(s"call show_clustering(path => '$basePath', show_involved_partition => true)")(
- Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "ts=1000,ts=1001,ts=1002")
+ Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "partition=1000,partition=1001,partition=1002")
)
// Do clustering for all the clustering plan
- checkAnswer(s"call run_clustering(path => '$basePath', order => 'ts')")(
+ checkAnswer(s"call run_clustering(path => '$basePath', order => 'partition')")(
Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "*")
)
- checkAnswer(s"select id, name, price, ts from $tableName order by id")(
- Seq(1, "a1", 10.0, 1000),
- Seq(2, "a2", 10.0, 1001),
- Seq(3, "a3", 10.0, 1002)
+ checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1000, 1000),
+ Seq(2, "a2", 10.0, 1001, 1001),
+ Seq(3, "a3", 10.0, 1002, 1002)
)
val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
@@ -199,20 +201,20 @@
assertResult(1)(finishedClustering.size)
// Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
- spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
- spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
- val resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L', show_involved_partition => true)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003, 1003)")
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004, 1004)")
+ val resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'partition >= 1003L', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(resultA.length)
- assertResult("ts=1003,ts=1004")(resultA(0)(3))
+ assertResult("partition=1003,partition=1004")(resultA(0)(3))
- checkAnswer(s"select id, name, price, ts from $tableName order by id")(
- Seq(1, "a1", 10.0, 1000),
- Seq(2, "a2", 10.0, 1001),
- Seq(3, "a3", 10.0, 1002),
- Seq(4, "a4", 10.0, 1003),
- Seq(5, "a5", 10.0, 1004)
+ checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1000, 1000),
+ Seq(2, "a2", 10.0, 1001, 1001),
+ Seq(3, "a3", 10.0, 1002, 1002),
+ Seq(4, "a4", 10.0, 1003, 1003),
+ Seq(5, "a5", 10.0, 1004, 1004)
)
finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -236,14 +238,15 @@
| id int,
| name string,
| price double,
- | ts long
+ | ts long,
+ | partition long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
- | partitioned by(ts)
+ | partitioned by(partition)
| location '$basePath'
""".stripMargin)
@@ -253,20 +256,20 @@
var resultA: Array[Seq[Any]] = Array.empty
{
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1002)")
checkException(
- s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L and id = 10', order => 'ts')"
+ s"call run_clustering(table => '$tableName', predicate => 'partition <= 1001L and id = 10', order => 'partition')"
)("Only partition predicates are allowed")
// Do clustering table with partition predicate
- resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts', show_involved_partition => true)")
+ resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'partition <= 1001L', order => 'partition', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(resultA.length)
- assertResult("ts=1000,ts=1001")(resultA(0)(3))
+ assertResult("partition=1000,partition=1001")(resultA(0)(3))
// There is 1 completed clustering instant
val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -285,13 +288,13 @@
// All clustering instants are completed
checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
- Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001")
+ Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1000,partition=1001")
)
- checkAnswer(s"select id, name, price, ts from $tableName order by id")(
- Seq(1, "a1", 10.0, 1000),
- Seq(2, "a2", 10.0, 1001),
- Seq(3, "a3", 10.0, 1002)
+ checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1000, 1000),
+ Seq(2, "a2", 10.0, 1001, 1001),
+ Seq(3, "a3", 10.0, 1002, 1002)
)
}
@@ -299,20 +302,20 @@
var resultB: Array[Seq[Any]] = Array.empty
{
- spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
- spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
- spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003, 1003)")
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004, 1004)")
+ spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005, 1005)")
checkException(
- s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L and id = 10', order => 'ts')"
+ s"call run_clustering(table => '$tableName', predicate => 'partition > 1001L and partition <= 1005L and id = 10', order => 'partition')"
)("Only partition predicates are allowed")
// Do clustering table with partition predicate
- resultB = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts', show_involved_partition => true)")
+ resultB = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'partition > 1001L and partition <= 1005L', order => 'partition', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(resultB.length)
- assertResult("ts=1002,ts=1003,ts=1004,ts=1005")(resultB(0)(3))
+ assertResult("partition=1002,partition=1003,partition=1004,partition=1005")(resultB(0)(3))
// There are 2 completed clustering instants
val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -330,17 +333,17 @@
// All clustering instants are completed
checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
- Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"),
- Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005")
+ Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1000,partition=1001"),
+ Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1002,partition=1003,partition=1004,partition=1005")
)
- checkAnswer(s"select id, name, price, ts from $tableName order by id")(
- Seq(1, "a1", 10.0, 1000),
- Seq(2, "a2", 10.0, 1001),
- Seq(3, "a3", 10.0, 1002),
- Seq(4, "a4", 10.0, 1003),
- Seq(5, "a5", 10.0, 1004),
- Seq(6, "a6", 10.0, 1005)
+ checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1000, 1000),
+ Seq(2, "a2", 10.0, 1001, 1001),
+ Seq(3, "a3", 10.0, 1002, 1002),
+ Seq(4, "a4", 10.0, 1003, 1003),
+ Seq(5, "a5", 10.0, 1004, 1004),
+ Seq(6, "a6", 10.0, 1005, 1005)
)
}
@@ -348,21 +351,21 @@
var resultC: Array[Seq[Any]] = Array.empty
{
- spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006)")
- spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007)")
- spark.sql(s"insert into $tableName values(9, 'a9', 10, 1008)")
- spark.sql(s"insert into $tableName values(10, 'a10', 10, 1009)")
+ spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006, 1006)")
+ spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007, 1007)")
+ spark.sql(s"insert into $tableName values(9, 'a9', 10, 1008, 1008)")
+ spark.sql(s"insert into $tableName values(10, 'a10', 10, 1009, 1009)")
checkException(
- s"call run_clustering(table => '$tableName', predicate => 'ts < 1007L or ts >= 1008L or id = 10', order => 'ts')"
+ s"call run_clustering(table => '$tableName', predicate => 'partition < 1007L or partition >= 1008L or id = 10', order => 'partition')"
)("Only partition predicates are allowed")
// Do clustering table with partition predicate
- resultC = spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts', show_involved_partition => true)")
+ resultC = spark.sql(s"call run_clustering(table => '$tableName', predicate => '(partition >= 1006L and partition < 1008L) or partition >= 1009L', order => 'partition', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(resultC.length)
- assertResult("ts=1006,ts=1007,ts=1009")(resultC(0)(3))
+ assertResult("partition=1006,partition=1007,partition=1009")(resultC(0)(3))
// There are 3 completed clustering instants
val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -380,28 +383,28 @@
// All clustering instants are completed
checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
- Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"),
- Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005"),
- Seq(resultC(0).head, resultC(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1006,ts=1007,ts=1009")
+ Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1000,partition=1001"),
+ Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1002,partition=1003,partition=1004,partition=1005"),
+ Seq(resultC(0).head, resultC(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1006,partition=1007,partition=1009")
)
- checkAnswer(s"select id, name, price, ts from $tableName order by id")(
- Seq(1, "a1", 10.0, 1000),
- Seq(2, "a2", 10.0, 1001),
- Seq(3, "a3", 10.0, 1002),
- Seq(4, "a4", 10.0, 1003),
- Seq(5, "a5", 10.0, 1004),
- Seq(6, "a6", 10.0, 1005),
- Seq(7, "a7", 10.0, 1006),
- Seq(8, "a8", 10.0, 1007),
- Seq(9, "a9", 10.0, 1008),
- Seq(10, "a10", 10.0, 1009)
+ checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1000, 1000),
+ Seq(2, "a2", 10.0, 1001, 1001),
+ Seq(3, "a3", 10.0, 1002, 1002),
+ Seq(4, "a4", 10.0, 1003, 1003),
+ Seq(5, "a5", 10.0, 1004, 1004),
+ Seq(6, "a6", 10.0, 1005, 1005),
+ Seq(7, "a7", 10.0, 1006, 1006),
+ Seq(8, "a8", 10.0, 1007, 1007),
+ Seq(9, "a9", 10.0, 1008, 1008),
+ Seq(10, "a10", 10.0, 1009, 1009)
)
}
// Test partition pruning with invalid predicates
{
- val resultD = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1111L', order => 'ts', show_involved_partition => true)")
+ val resultD = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'partition > 1111L', order => 'partition', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(0)(resultD.length)
@@ -460,7 +463,7 @@
// test with operator schedule
checkExceptionContain(
- s"call run_clustering(table => '$tableName', instants => '000000', op => 'schedule')"
+ s"call run_clustering(table => '$tableName', instants => '000000', op => 'schedule')"
)("specific instants only can be used in 'execute' op or not specific op")
// test with operator scheduleAndExecute
@@ -628,56 +631,57 @@
| id int,
| name string,
| price double,
- | ts long
+ | ts long,
+ | partition long
|) using hudi
| options (
| primaryKey ='id',
| type = 'cow',
| preCombineField = 'ts'
| )
- | partitioned by(ts)
+ | partitioned by(partition)
| location '$basePath'
""".stripMargin)
// Test clustering with PARTITION_SELECTED config set, choose only a part of all partitions to schedule
{
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010, 1010)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010, 1010)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011, 1011)")
// Do
val result = spark.sql(s"call run_clustering(table => '$tableName', " +
- s"selected_partitions => 'ts=1010', show_involved_partition => true)")
+ s"selected_partitions => 'partition=1010', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(result.length)
- assertResult("ts=1010")(result(0)(3))
+ assertResult("partition=1010")(result(0)(3))
- checkAnswer(s"select id, name, price, ts from $tableName order by id")(
- Seq(1, "a1", 10.0, 1010),
- Seq(2, "a2", 10.0, 1010),
- Seq(3, "a3", 10.0, 1011)
+ checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1010, 1010),
+ Seq(2, "a2", 10.0, 1010, 1010),
+ Seq(3, "a3", 10.0, 1011, 1011)
)
}
// Test clustering with PARTITION_SELECTED, choose all partitions to schedule
{
- spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
- spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
- spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010, 1010)")
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011, 1011)")
+ spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012, 1012)")
val result = spark.sql(s"call run_clustering(table => '$tableName', " +
- s"selected_partitions => 'ts=1010,ts=1011,ts=1012', show_involved_partition => true)")
+ s"selected_partitions => 'partition=1010,partition=1011,partition=1012', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(result.length)
- assertResult("ts=1010,ts=1011,ts=1012")(result(0)(3))
+ assertResult("partition=1010,partition=1011,partition=1012")(result(0)(3))
- checkAnswer(s"select id, name, price, ts from $tableName order by id")(
- Seq(1, "a1", 10.0, 1010),
- Seq(2, "a2", 10.0, 1010),
- Seq(3, "a3", 10.0, 1011),
- Seq(4, "a4", 10.0, 1010),
- Seq(5, "a5", 10.0, 1011),
- Seq(6, "a6", 10.0, 1012)
+ checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1010, 1010),
+ Seq(2, "a2", 10.0, 1010, 1010),
+ Seq(3, "a3", 10.0, 1011, 1011),
+ Seq(4, "a4", 10.0, 1010, 1010),
+ Seq(5, "a5", 10.0, 1011, 1011),
+ Seq(6, "a6", 10.0, 1012, 1012)
)
}
}
@@ -693,7 +697,8 @@
| id int,
| name string,
| price double,
- | ts long
+ | ts long,
+ | partition long
|) using hudi
| tblproperties (
| primaryKey ='id',
@@ -702,12 +707,12 @@
| hoodie.index.type = 'BUCKET',
| hoodie.bucket.index.hash.field = 'id'
| )
- | partitioned by (ts)
+ | partitioned by (partition)
| location '$basePath'
""".stripMargin)
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010, 1010)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010, 1010)")
checkExceptionContain(s"call run_clustering(table => '$tableName')")(
"Executor SparkExecuteClusteringCommitActionExecutor is not compatible with table layout HoodieSimpleBucketLayout")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
index 10ee950..c9a8c8e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
@@ -31,9 +31,10 @@
| id int,
| name string,
| price double,
- | ts long
+ | ts long,
+ | partition int
|) using hudi
- | partitioned by (ts)
+ | partitioned by (partition)
| location '$tablePath'
| tblproperties (
| type = 'mor',
@@ -42,8 +43,8 @@
| )
""".stripMargin)
// insert data to table
- spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
- spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500, 1500")
spark.sql(s"update $tableName set name = 'b1', price = 100 where id = 1")
// Check required fields
@@ -52,7 +53,7 @@
// collect result for table
val result = spark.sql(
- s"""call show_logfile_metadata(table => '$tableName', log_file_path_pattern => '$tablePath/ts=1000/*.log.*')""".stripMargin).collect()
+ s"""call show_logfile_metadata(table => '$tableName', log_file_path_pattern => '$tablePath/partition=1000/*.log.*')""".stripMargin).collect()
assertResult(1) {
result.length
}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 7458d16..9f3a5ce0 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -98,22 +98,21 @@
Join(left, right, joinType, condition)
}
- override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+ override def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean = {
+ val thisOutput = a.output
+ val otherOutput = b.output
+ thisOutput.length == otherOutput.length && thisOutput.zip(otherOutput).forall {
+ case (a1, a2) => a1.semanticEquals(a2)
+ }
+ }
+
+ override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
plan match {
case physicalOperation@PhysicalOperation(_, _,
- logicalRelation@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait] && !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
- val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait]
- ff.isProjected = true
- val tableSchema = fs.location match {
- case index: HoodieCDCFileIndex => index.cdcRelation.schema
- case index: SparkHoodieTableFileIndex => index.schema
- }
- val resolvedSchema = logicalRelation.resolve(tableSchema, fs.sparkSession.sessionState.analyzer.resolver)
- if (!fs.partitionSchema.fields.isEmpty) {
- Project(resolvedSchema, physicalOperation)
- } else {
- physicalOperation
- }
+ logicalRelation@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+ if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+ && !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
+ FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(physicalOperation, logicalRelation, fs)
case _ => plan
}
}
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
index f5757e1..399e053 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
@@ -17,17 +17,15 @@
package org.apache.spark.sql
-import org.apache.hudi.{HoodieCDCFileIndex, SparkAdapterSupport, SparkHoodieTableFileIndex}
+import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableOutputResolver
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LeafNode, LogicalPlan}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.{CreateTableLikeCommand, ExplainCommand}
-import org.apache.spark.sql.execution.datasources.HadoopFsRelation
-import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, ParquetFileFormat}
import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -82,25 +80,14 @@
override def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression], hint: String): LogicalPlan = {
Join(left, right, joinType, condition, JoinHint.NONE)
}
+
+ override def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean = {
+ a.sameOutput(b)
+ }
}
object HoodieSpark3CatalystPlanUtils extends SparkAdapterSupport {
- def applyNewFileFormatChanges(scanOperation: LogicalPlan, logicalRelation: LogicalPlan, fs: HadoopFsRelation): LogicalPlan = {
- val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait]
- ff.isProjected = true
- val tableSchema = fs.location match {
- case index: HoodieCDCFileIndex => index.cdcRelation.schema
- case index: SparkHoodieTableFileIndex => index.schema
- }
- val resolvedSchema = logicalRelation.resolve(tableSchema, fs.sparkSession.sessionState.analyzer.resolver)
- if (!fs.partitionSchema.fields.isEmpty && scanOperation.sameOutput(logicalRelation)) {
- Project(resolvedSchema, scanOperation)
- } else {
- scanOperation
- }
- }
-
/**
* This is an extractor to accommodate for [[ResolvedTable]] signature change in Spark 3.2
*/
diff --git a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
index 34a1e08..dbe68bb 100644
--- a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
@@ -45,13 +45,13 @@
}
}
- override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+ override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
plan match {
case s@ScanOperation(_, _,
l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
&& !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
- HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+ FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index dd3ac53..765a9b0 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -46,13 +46,13 @@
}
}
- override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+ override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
plan match {
case s@ScanOperation(_, _,
l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
&& !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
- HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+ FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index 7a30aeb..8562ca1 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -48,13 +48,13 @@
}
}
- override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+ override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
plan match {
case s@ScanOperation(_, _,
l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
&& !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
- HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+ FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index 9bcc40a..54dbaa0 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -46,13 +46,13 @@
}
}
- override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+ override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
plan match {
case s@ScanOperation(_, _,
l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
&& !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
- HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+ FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}
diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
index e49adeb..2c50d21 100644
--- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
@@ -45,13 +45,13 @@
}
}
- override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+ override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
plan match {
case s@ScanOperation(_, _, _,
l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
&& !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
- HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+ FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}
diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
index eac3153..b95ee94 100644
--- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
@@ -45,13 +45,13 @@
}
}
- override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+ override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
plan match {
case s@ScanOperation(_, _, _,
l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
&& !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
- HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+ FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 03208a0..b39c5f2 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -60,7 +60,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieIncrementalPathNotFoundException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
@@ -105,6 +104,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
@@ -2390,7 +2390,7 @@
insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
//No change as this fails with Path not exist error
- assertThrows(HoodieIncrementalPathNotFoundException.class, () -> new HoodieDeltaStreamer(downstreamCfg, jsc).sync());
+ assertThrows(SparkException.class, () -> new HoodieDeltaStreamer(downstreamCfg, jsc).sync());
assertRecordCount(1000, downstreamTableBasePath, sqlContext);
if (downstreamCfg.configs == null) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index 1b534c22..b9e20fb 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -46,6 +46,7 @@
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -72,6 +73,11 @@
public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
+ @Override
+ public SparkConf conf() {
+ return conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
+ }
+
private HoodieTestDataGenerator dataGen;
private HoodieTableMetaClient metaClient;
private HoodieTableType tableType = COPY_ON_WRITE;