[HUDI-1591] Implement Spark's FileIndex for Hudi to support queries via Hudi DataSource using non-globbed table path and partition pruning (#2651)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index 724cabd..3b927c9b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -44,7 +44,7 @@
public class CustomAvroKeyGenerator extends BaseKeyGenerator {
private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
- private static final String SPLIT_REGEX = ":";
+ public static final String SPLIT_REGEX = ":";
/**
* Used as a part of config in CustomKeyGenerator.java.
@@ -117,8 +117,4 @@
public String getDefaultPartitionPathSeparator() {
return DEFAULT_PARTITION_PATH_SEPARATOR;
}
-
- public String getSplitRegex() {
- return SPLIT_REGEX;
- }
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index 77896d2..a2a3012 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -90,7 +90,7 @@
return "";
}
for (String field : getPartitionPathFields()) {
- String[] fieldWithType = field.split(customAvroKeyGenerator.getSplitRegex());
+ String[] fieldWithType = field.split(customAvroKeyGenerator.SPLIT_REGEX);
if (fieldWithType.length != 2) {
throw new HoodieKeyGeneratorException("Unable to find field names for partition path in proper format");
}
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
new file mode 100644
index 0000000..fc2275b
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.TimeZone
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.execution.datasources.PartitioningUtils.PartitionValues
+import org.apache.spark.sql.types.DataType
+
+trait SparkParsePartitionUtil extends Serializable {
+
+ def parsePartition(
+ path: Path,
+ typeInference: Boolean,
+ basePaths: Set[Path],
+ userSpecifiedDataTypes: Map[String, DataType],
+ timeZone: TimeZone): Option[PartitionValues]
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index d37c617..9b229a3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -276,6 +276,16 @@
}
}
+ public static FileStatus[] getFilesInPartition(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
+ String basePathStr, Path partitionPath) {
+ try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext,
+ metadataConfig, basePathStr, FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR)) {
+ return tableMetadata.getAllFilesInPartition(partitionPath);
+ } catch (Exception e) {
+ throw new HoodieException("Error get files in partition: " + partitionPath, e);
+ }
+ }
+
public static String getFileExtension(String fullName) {
Objects.requireNonNull(fullName);
String fileName = new File(fullName).getName();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index f519c91..0b36e31 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table;
+import java.util.Arrays;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
@@ -57,6 +58,7 @@
public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type";
public static final String HOODIE_TABLE_VERSION_PROP_NAME = "hoodie.table.version";
public static final String HOODIE_TABLE_PRECOMBINE_FIELD = "hoodie.table.precombine.field";
+ public static final String HOODIE_TABLE_PARTITION_COLUMNS = "hoodie.table.partition.columns";
@Deprecated
public static final String HOODIE_RO_FILE_FORMAT_PROP_NAME = "hoodie.table.ro.file.format";
@@ -193,6 +195,14 @@
return props.getProperty(HOODIE_TABLE_PRECOMBINE_FIELD);
}
+ public Option<String[]> getPartitionColumns() {
+ if (props.containsKey(HOODIE_TABLE_PARTITION_COLUMNS)) {
+ return Option.of(Arrays.stream(props.getProperty(HOODIE_TABLE_PARTITION_COLUMNS).split(","))
+ .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[]{}));
+ }
+ return Option.empty();
+ }
+
/**
* Read the payload class for HoodieRecords from the table properties.
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 5de3b9a..f4edeb8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -596,6 +596,7 @@
private Integer timelineLayoutVersion;
private String baseFileFormat;
private String preCombineField;
+ private String partitionColumns;
private String bootstrapIndexClass;
private String bootstrapBasePath;
@@ -646,6 +647,11 @@
return this;
}
+ public PropertyBuilder setPartitionColumns(String partitionColumns) {
+ this.partitionColumns = partitionColumns;
+ return this;
+ }
+
public PropertyBuilder setBootstrapIndexClass(String bootstrapIndexClass) {
this.bootstrapIndexClass = bootstrapIndexClass;
return this;
@@ -696,6 +702,9 @@
if (properties.containsKey(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD)) {
setPreCombineField(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD));
}
+ if (properties.containsKey(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS)) {
+ setPartitionColumns(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS));
+ }
return this;
}
@@ -738,6 +747,10 @@
if (null != preCombineField) {
properties.put(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD, preCombineField);
}
+
+ if (null != partitionColumns) {
+ properties.put(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS, partitionColumns);
+ }
return properties;
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 51f32a2..4c76f5f 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -68,6 +68,9 @@
val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_PROP
+ val ENABLE_HOODIE_FILE_INDEX = "hoodie.file.index.enable"
+ val DEFAULT_ENABLE_HOODIE_FILE_INDEX = true
+
@Deprecated
val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
@Deprecated
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 3299b8f..0b8234d 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -19,14 +19,16 @@
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
-import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
import org.apache.spark.sql.sources._
@@ -79,39 +81,53 @@
val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
- val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
-
- val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+ // Use the HoodieFileIndex only if the 'path' is not globbed.
+ // Or else we use the original way to read hoodie table.
+ val enableFileIndex = optParams.get(ENABLE_HOODIE_FILE_INDEX)
+ .map(_.toBoolean).getOrElse(DEFAULT_ENABLE_HOODIE_FILE_INDEX)
+ val useHoodieFileIndex = enableFileIndex && path.isDefined && !path.get.contains("*") &&
+ !parameters.contains(DataSourceReadOptions.READ_PATHS_OPT_KEY)
+ val globPaths = if (useHoodieFileIndex) {
+ None
+ } else {
+ Some(HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs))
+ }
+ // Get the table base path
+ val tablePath = if (globPaths.isDefined) {
+ DataSourceUtils.getTablePath(fs, globPaths.get.toArray)
+ } else {
+ DataSourceUtils.getTablePath(fs, Array(new Path(path.get)))
+ }
log.info("Obtained hudi table path: " + tablePath)
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
- log.info("Is bootstrapped table => " + isBootstrappedTable)
+ val tableType = metaClient.getTableType
+ val queryType = parameters(QUERY_TYPE_OPT_KEY)
+ log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType")
- if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
- if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
- if (isBootstrappedTable) {
- // Snapshot query is not supported for Bootstrapped MOR tables
- log.warn("Snapshot query is not supported for Bootstrapped Merge-on-Read tables." +
- " Falling back to Read Optimized query.")
- new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
- } else {
- new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
- }
- } else {
- getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
- }
- } else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
- getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
- } else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
- val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
- if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
- new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, metaClient)
- } else {
- new IncrementalRelation(sqlContext, optParams, schema, metaClient)
- }
- } else {
- throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
+ (tableType, queryType, isBootstrappedTable) match {
+ case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
+ (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
+ (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
+ getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, schema, tablePath,
+ readPaths, metaClient)
+
+ case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
+ new IncrementalRelation(sqlContext, parameters, schema, metaClient)
+
+ case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
+ new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, globPaths, metaClient)
+
+ case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
+ new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, metaClient)
+
+ case (_, _, true) =>
+ new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, parameters)
+
+ case (_, _, _) =>
+ throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
+ s"isBootstrappedTable: $isBootstrappedTable ")
}
}
@@ -162,18 +178,28 @@
override def shortName(): String = "hudi"
- private def getBaseFileOnlyView(sqlContext: SQLContext,
+ private def getBaseFileOnlyView(useHoodieFileIndex: Boolean,
+ sqlContext: SQLContext,
optParams: Map[String, String],
schema: StructType,
+ tablePath: String,
extraReadPaths: Seq[String],
- isBootstrappedTable: Boolean,
- globPaths: Seq[Path],
metaClient: HoodieTableMetaClient): BaseRelation = {
- log.warn("Loading Base File Only View.")
+ log.info("Loading Base File Only View with options :" + optParams)
- if (isBootstrappedTable) {
- // For bootstrapped tables, use our custom Spark relation for querying
- new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
+ if (useHoodieFileIndex) {
+
+ val fileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
+ if (schema == null) Option.empty[StructType] else Some(schema),
+ optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))
+
+ HadoopFsRelation(
+ fileIndex,
+ fileIndex.partitionSchema,
+ fileIndex.dataSchema,
+ bucketSpec = None,
+ fileFormat = new ParquetFileFormat,
+ optParams)(sqlContext.sparkSession)
} else {
// this is just effectively RO view only, where `path` can contain a mix of
// non-hoodie/hoodie path files. set the path filter up
@@ -182,7 +208,6 @@
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])
- log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
// simply return as a regular parquet relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index f7415f9..b1ab83a 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -26,7 +26,7 @@
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
@@ -46,13 +46,14 @@
*
* @param _sqlContext Spark SQL Context
* @param userSchema User specified schema in the datasource query
- * @param globPaths Globbed paths obtained from the user provided path for querying
+ * @param globPaths The global paths to query. If it not none, read from the globPaths,
+ * else read data from tablePath using HoodiFileIndex.
* @param metaClient Hoodie table meta client
* @param optParams DataSource options passed by the user
*/
class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
val userSchema: StructType,
- val globPaths: Seq[Path],
+ val globPaths: Option[Seq[Path]],
val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String]) extends BaseRelation
with PrunedFilteredScan with Logging {
@@ -156,9 +157,14 @@
def buildFileIndex(): HoodieBootstrapFileIndex = {
logInfo("Building file index..")
- val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths)
- val fileStatuses = inMemoryFileIndex.allFiles()
-
+ val fileStatuses = if (globPaths.isDefined) {
+ // Load files from the global paths if it has defined to be compatible with the original mode
+ val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths.get)
+ inMemoryFileIndex.allFiles()
+ } else { // Load files by the HoodieFileIndex.
+ HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), optParams,
+ FileStatusCache.getOrCreate(sqlContext.sparkSession)).allFiles
+ }
if (fileStatuses.isEmpty) {
throw new HoodieException("No files found for reading in user provided path.")
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
new file mode 100644
index 0000000..61c2f3a
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.mutable
+
+/**
+ * A file index which support partition prune for hoodie snapshot and read-optimized query.
+ *
+ * Main steps to get the file list for query:
+ * 1、Load all files and partition values from the table path.
+ * 2、Do the partition prune by the partition filter condition.
+ *
+ * There are 3 cases for this:
+ * 1、If the partition columns size is equal to the actually partition path level, we
+ * read it as partitioned table.(e.g partition column is "dt", the partition path is "2021-03-10")
+ *
+ * 2、If the partition columns size is not equal to the partition path level, but the partition
+ * column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10"
+ * who'es directory level is 3).We can still read it as a partitioned table. We will mapping the
+ * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+ *
+ * 3、Else the the partition columns size is not equal to the partition directory level and the
+ * size is great than "1" (e.g. partition column is "dt,hh", the partition path is "2021/03/10/12")
+ * , we read it as a Non-Partitioned table because we cannot know how to mapping the partition
+ * path with the partition columns in this case.
+ *
+ */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ metaClient: HoodieTableMetaClient,
+ schemaSpec: Option[StructType],
+ options: Map[String, String],
+ @transient fileStatusCache: FileStatusCache = NoopCache)
+ extends FileIndex with Logging {
+
+ private val basePath = metaClient.getBasePath
+
+ @transient private val queryPath = new Path(options.getOrElse("path", "'path' option required"))
+ /**
+ * Get the schema of the table.
+ */
+ lazy val schema: StructType = schemaSpec.getOrElse({
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+ .dataType.asInstanceOf[StructType]
+ })
+
+ /**
+ * Get the partition schema from the hoodie.properties.
+ */
+ private lazy val _partitionSchemaFromProperties: StructType = {
+ val tableConfig = metaClient.getTableConfig
+ val partitionColumns = tableConfig.getPartitionColumns
+ val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+
+ if (partitionColumns.isPresent) {
+ val partitionFields = partitionColumns.get().map(column =>
+ nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
+ s"$column' in the schema[${schema.fields.mkString(",")}]")))
+ new StructType(partitionFields)
+ } else { // If the partition columns have not stored in hoodie.properites(the table that was
+ // created earlier), we trait it as a non-partitioned table.
+ logWarning("No partition columns available from hoodie.properties." +
+ " Partition pruning will not work")
+ new StructType()
+ }
+ }
+
+ @transient @volatile private var fileSystemView: HoodieTableFileSystemView = _
+ @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _
+ @transient @volatile private var cachedFileSize: Long = 0L
+ @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionRowPath] = _
+
+ @volatile private var queryAsNonePartitionedTable: Boolean = _
+
+ refresh0()
+
+ override def rootPaths: Seq[Path] = queryPath :: Nil
+
+ override def listFiles(partitionFilters: Seq[Expression],
+ dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+ if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
+ Seq(PartitionDirectory(InternalRow.empty, allFiles))
+ } else {
+ // Prune the partition path by the partition filters
+ val prunedPartitions = prunePartition(cachedAllPartitionPaths, partitionFilters)
+ prunedPartitions.map { partition =>
+ val fileStatues = fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator()
+ .asScala.toSeq
+ .map(_.getFileStatus)
+ PartitionDirectory(partition.values, fileStatues)
+ }
+ }
+ }
+
+ override def inputFiles: Array[String] = {
+ cachedAllInputFiles.map(_.getFileStatus.getPath.toString)
+ }
+
+ override def refresh(): Unit = {
+ fileStatusCache.invalidateAll()
+ refresh0()
+ }
+
+ private def refresh0(): Unit = {
+ val startTime = System.currentTimeMillis()
+ val partitionFiles = loadPartitionPathFiles()
+ val allFiles = partitionFiles.values.reduceOption(_ ++ _)
+ .getOrElse(Array.empty[FileStatus])
+
+ metaClient.reloadActiveTimeline()
+ val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
+ fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles)
+ cachedAllInputFiles = fileSystemView.getLatestBaseFiles.iterator().asScala.toArray
+ cachedAllPartitionPaths = partitionFiles.keys.toSeq
+ cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum
+
+ // If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
+ queryAsNonePartitionedTable = cachedAllPartitionPaths
+ .exists(p => p.values == InternalRow.empty)
+ val flushSpend = System.currentTimeMillis() - startTime
+ logInfo(s"Refresh for table ${metaClient.getTableConfig.getTableName}," +
+ s" spend: $flushSpend ms")
+ }
+
+ override def sizeInBytes: Long = {
+ cachedFileSize
+ }
+
+ override def partitionSchema: StructType = {
+ if (queryAsNonePartitionedTable) {
+ // If we read it as Non-Partitioned table, we should not
+ // return the partition schema.
+ new StructType()
+ } else {
+ _partitionSchemaFromProperties
+ }
+ }
+
+ /**
+ * Get the data schema of the table.
+ * @return
+ */
+ def dataSchema: StructType = {
+ val partitionColumns = partitionSchema.fields.map(_.name).toSet
+ StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name)))
+ }
+
+ def allFiles: Seq[FileStatus] = cachedAllInputFiles.map(_.getFileStatus)
+
+ /**
+ * Prune the partition by the filter.This implementation is fork from
+ * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
+ * @param partitionPaths All the partition paths.
+ * @param predicates The filter condition.
+ * @return The Pruned partition paths.
+ */
+ private def prunePartition(partitionPaths: Seq[PartitionRowPath],
+ predicates: Seq[Expression]): Seq[PartitionRowPath] = {
+
+ val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
+ val partitionPruningPredicates = predicates.filter {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ }
+ if (partitionPruningPredicates.nonEmpty) {
+ val predicate = partitionPruningPredicates.reduce(expressions.And)
+
+ val boundPredicate = InterpretedPredicate(predicate.transform {
+ case a: AttributeReference =>
+ val index = partitionSchema.indexWhere(a.name == _.name)
+ BoundReference(index, partitionSchema(index).dataType, nullable = true)
+ })
+
+ val prunedPartitionPaths = partitionPaths.filter {
+ case PartitionRowPath(values, _) => boundPredicate.eval(values)
+ }
+ logInfo(s"Total partition size is: ${partitionPaths.size}," +
+ s" after partition prune size is: ${prunedPartitionPaths.size}")
+ prunedPartitionPaths
+ } else {
+ partitionPaths
+ }
+ }
+
+ /**
+ * Load all partition paths and it's files under the query table path.
+ */
+ private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = {
+ val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
+ val properties = new Properties()
+ properties.putAll(options.asJava)
+ val metadataConfig = HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
+
+ val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath)
+ // Load all the partition path from the basePath, and filter by the query partition path.
+ // TODO load files from the queryPartitionPath directly.
+ val partitionPaths = FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, basePath).asScala
+ .filter(_.startsWith(queryPartitionPath))
+
+ val writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath).withProperties(properties).build()
+ val maxListParallelism = writeConfig.getFileListingParallelism
+
+ val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
+ val partitionSchema = _partitionSchemaFromProperties
+ val timeZoneId = CaseInsensitiveMap(options)
+ .get(DateTimeUtils.TIMEZONE_OPTION)
+ .getOrElse(SQLConf.get.sessionLocalTimeZone)
+
+ val sparkParsePartitionUtil = HoodieSparkUtils.createSparkParsePartitionUtil(spark
+ .sessionState.conf)
+ // Convert partition path to PartitionRowPath
+ val partitionRowPaths = partitionPaths.map { partitionPath =>
+ val partitionRow = if (partitionSchema.fields.length == 0) {
+ // This is a non-partitioned table
+ InternalRow.empty
+ } else {
+ val partitionFragments = partitionPath.split("/")
+
+ if (partitionFragments.length != partitionSchema.fields.length &&
+ partitionSchema.fields.length == 1) {
+ // If the partition column size is not equal to the partition fragment size
+ // and the partition column size is 1, we map the whole partition path
+ // to the partition column which can benefit from the partition prune.
+ InternalRow.fromSeq(Seq(UTF8String.fromString(partitionPath)))
+ } else if (partitionFragments.length != partitionSchema.fields.length &&
+ partitionSchema.fields.length > 1) {
+ // If the partition column size is not equal to the partition fragments size
+ // and the partition column size > 1, we do not know how to map the partition
+ // fragments to the partition columns. So we trait it as a Non-Partitioned Table
+ // for the query which do not benefit from the partition prune.
+ logWarning( s"Cannot do the partition prune for table $basePath." +
+ s"The partitionFragments size (${partitionFragments.mkString(",")})" +
+ s" is not equal to the partition columns size(${partitionSchema.fields.mkString(",")})")
+ InternalRow.empty
+ } else { // If partitionSeqs.length == partitionSchema.fields.length
+
+ // Append partition name to the partition value if the
+ // HIVE_STYLE_PARTITIONING_OPT_KEY is disable.
+ // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
+ val partitionWithName =
+ partitionFragments.zip(partitionSchema).map {
+ case (partition, field) =>
+ if (partition.indexOf("=") == -1) {
+ s"${field.name}=$partition"
+ } else {
+ partition
+ }
+ }.mkString("/")
+ val pathWithPartitionName = new Path(basePath, partitionWithName)
+ val partitionDataTypes = partitionSchema.fields.map(f => f.name -> f.dataType).toMap
+ val partitionValues = sparkParsePartitionUtil.parsePartition(pathWithPartitionName,
+ typeInference = false, Set(new Path(basePath)), partitionDataTypes,
+ DateTimeUtils.getTimeZone(timeZoneId))
+
+ // Convert partitionValues to InternalRow
+ partitionValues.map(_.literals.map(_.value))
+ .map(InternalRow.fromSeq)
+ .getOrElse(InternalRow.empty)
+ }
+ }
+ PartitionRowPath(partitionRow, partitionPath)
+ }
+
+ // List files in all of the partition path.
+ val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]()
+ val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]()
+ // Fetch from the FileStatusCache
+ partitionRowPaths.foreach { partitionRowPath =>
+ fileStatusCache.getLeafFiles(partitionRowPath.fullPartitionPath(basePath)) match {
+ case Some(filesInPartition) =>
+ cachePartitionToFiles.put(partitionRowPath, filesInPartition)
+
+ case None => pathToFetch.append(partitionRowPath)
+ }
+ }
+ // Fetch the rest from the file system.
+ val fetchedPartition2Files =
+ spark.sparkContext.parallelize(pathToFetch, Math.min(pathToFetch.size, maxListParallelism))
+ .map { partitionRowPath =>
+ // Here we use a LocalEngineContext to get the files in the partition.
+ // We can do this because the TableMetadata.getAllFilesInPartition only rely on the
+ // hadoopConf of the EngineContext.
+ val engineContext = new HoodieLocalEngineContext(serializableConf.get())
+ val filesInPartition = FSUtils.getFilesInPartition(engineContext, metadataConfig,
+ basePath, partitionRowPath.fullPartitionPath(basePath))
+ (partitionRowPath, filesInPartition)
+ }.collect().map(f => f._1 -> f._2).toMap
+
+ // Update the fileStatusCache
+ fetchedPartition2Files.foreach {
+ case (partitionRowPath, filesInPartition) =>
+ fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition)
+ }
+ cachePartitionToFiles.toMap ++ fetchedPartition2Files
+ }
+
+ /**
+ * Represent a partition path.
+ * e.g. PartitionPath(InternalRow("2021","02","01"), "2021/02/01"))
+ * @param values The partition values of this partition path.
+ * @param partitionPath The partition path string.
+ */
+ case class PartitionRowPath(values: InternalRow, partitionPath: String) {
+ override def equals(other: Any): Boolean = other match {
+ case PartitionRowPath(_, otherPath) => partitionPath == otherPath
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ partitionPath.hashCode
+ }
+
+ def fullPartitionPath(basePath: String): Path = {
+ if (partitionPath.isEmpty) {
+ new Path(basePath) // This is a non-partition path
+ } else {
+ new Path(basePath, partitionPath)
+ }
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 94d07b9..5b87278 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -100,6 +100,7 @@
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt)
+ val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
@@ -112,12 +113,15 @@
val archiveLogFolder = parameters.getOrElse(
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
+ val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator)
+
val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setTableName(tblName)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(parameters(PAYLOAD_CLASS_OPT_KEY))
.setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
+ .setPartitionColumns(partitionColumns)
.initTable(sparkContext.hadoopConfiguration, path.get)
tableConfig = tableMetaClient.getTableConfig
}
@@ -146,7 +150,6 @@
log.info(s"Registered avro schema : ${schema.toString(true)}")
// Convert to RDD[HoodieRecord]
- val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean || operation.equals(WriteOperationType.UPSERT);
val hoodieAllIncomingRecords = genericRecords.map(gr => {
@@ -193,7 +196,6 @@
classOf[org.apache.avro.Schema]))
// Convert to RDD[HoodieKey]
- val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace)
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
@@ -283,6 +285,7 @@
if (!tableExists) {
val archiveLogFolder = parameters.getOrElse(
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
+ val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName)
@@ -291,6 +294,7 @@
.setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
.setBootstrapIndexClass(bootstrapIndexClass)
.setBootstrapBasePath(bootstrapBasePath)
+ .setPartitionColumns(partitionColumns)
.initTable(sparkContext.hadoopConfiguration, path)
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index bd55930..72b26be 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -28,7 +28,8 @@
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
-import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex, Spark2ParsePartitionUtil, Spark3ParsePartitionUtil, SparkParsePartitionUtil}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import scala.collection.JavaConverters._
@@ -118,4 +119,13 @@
new Spark3RowSerDe(encoder)
}
}
+
+ def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = {
+ // TODO remove Spark2RowSerDe if Spark 2.x support is dropped
+ if (SPARK_VERSION.startsWith("2.")) {
+ new Spark2ParsePartitionUtil
+ } else {
+ new Spark3ParsePartitionUtil(conf)
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 02b5abd..fd3e078 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -17,16 +17,17 @@
package org.apache.hudi
+import scala.collection.JavaConverters._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.TypedProperties
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters.mapAsScalaMapConverter
-
import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE
import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE
import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP
import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP
+import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
/**
* WriterUtils to assist in write path in Datasource and tests.
@@ -81,4 +82,32 @@
params.foreach(kv => props.setProperty(kv._1, kv._2))
props
}
+
+ /**
+ * Get the partition columns to stored to hoodie.properties.
+ * @param parameters
+ * @return
+ */
+ def getPartitionColumns(parameters: Map[String, String]): String = {
+ val props = new TypedProperties()
+ props.putAll(parameters.asJava)
+ val keyGen = DataSourceUtils.createKeyGenerator(props)
+ getPartitionColumns(keyGen)
+ }
+
+ def getPartitionColumns(keyGen: KeyGenerator): String = {
+ keyGen match {
+ // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format
+ // is: "field_name: field_type", we extract the field_name from the partition path field.
+ case c: BaseKeyGenerator
+ if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] =>
+ c.getPartitionPathFields.asScala.map(pathField =>
+ pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
+ .headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
+ .mkString(",")
+
+ case b: BaseKeyGenerator => b.getPartitionPathFields.asScala.mkString(",")
+ case _=> null
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 13766da..4c2d332 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -201,7 +201,8 @@
val baseFiles = f.getAllFileSlices.iterator().filter(slice => slice.getBaseFile.isPresent).toList
val partitionedFile = if (baseFiles.nonEmpty) {
val baseFile = baseFiles.head.getBaseFile
- Option(PartitionedFile(InternalRow.empty, baseFile.get.getPath, 0, baseFile.get.getFileLen))
+ val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath)
+ Option(PartitionedFile(InternalRow.empty, filePath, 0, baseFile.get.getFileLen))
}
else {
Option.empty
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 50e2ec5..c9d413b 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -21,7 +21,6 @@
import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
-import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hadoop.fs.Path
@@ -29,7 +28,7 @@
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
@@ -54,7 +53,7 @@
class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
val optParams: Map[String, String],
val userSchema: StructType,
- val globPaths: Seq[Path],
+ val globPaths: Option[Seq[Path]],
val metaClient: HoodieTableMetaClient)
extends BaseRelation with PrunedFilteredScan with Logging {
@@ -133,25 +132,54 @@
}
def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
- val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
- val fileStatuses = inMemoryFileIndex.allFiles()
- if (fileStatuses.isEmpty) {
- throw new HoodieException("No files found for reading in user provided path.")
+ val fileStatuses = if (globPaths.isDefined) {
+ // Load files from the global paths if it has defined to be compatible with the original mode
+ val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
+ inMemoryFileIndex.allFiles()
+ } else { // Load files by the HoodieFileIndex.
+ val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
+ Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))
+ hoodieFileIndex.allFiles
}
- val fsView = new HoodieTableFileSystemView(metaClient,
- metaClient.getActiveTimeline.getCommitsTimeline
- .filterCompletedInstants, fileStatuses.toArray)
- val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
- val latestCommit = fsView.getLastInstant.get().getTimestamp
- val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala
- val fileSplits = fileGroup.map(kv => {
- val baseFile = kv._1
- val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
- val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen)
- HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit,
- metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
- }).toList
- fileSplits
+ if (fileStatuses.isEmpty) { // If this an empty table, return an empty split list.
+ List.empty[HoodieMergeOnReadFileSplit]
+ } else {
+ val fsView = new HoodieTableFileSystemView(metaClient,
+ metaClient.getActiveTimeline.getCommitsTimeline
+ .filterCompletedInstants, fileStatuses.toArray)
+ val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
+ val latestCommit = fsView.getLastInstant.get().getTimestamp
+ val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala
+ val fileSplits = fileGroup.map(kv => {
+ val baseFile = kv._1
+ val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
+
+ val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath)
+ val partitionedFile = PartitionedFile(InternalRow.empty, filePath, 0, baseFile.getFileLen)
+ HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit,
+ metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
+ }).toList
+ fileSplits
+ }
+ }
+}
+
+object MergeOnReadSnapshotRelation {
+
+ def getFilePath(path: Path): String = {
+ // Here we use the Path#toUri to encode the path string, as there is a decode in
+ // ParquetFileFormat#buildReaderWithPartitionValues in the spark project when read the table
+ // .So we should encode the file path here. Otherwise, there is a FileNotException throw
+ // out.
+ // For example, If the "pt" is the partition path field, and "pt" = "2021/02/02", If
+ // we enable the URL_ENCODE_PARTITIONING_OPT_KEY and write data to hudi table.The data
+ // path in the table will just like "/basePath/2021%2F02%2F02/xxxx.parquet". When we read
+ // data from the table, if there are no encode for the file path,
+ // ParquetFileFormat#buildReaderWithPartitionValues will decode it to
+ // "/basePath/2021/02/02/xxxx.parquet" witch will result to a FileNotException.
+ // See FileSourceScanExec#createBucketedReadRDD in spark project which do the same thing
+ // when create PartitionedFile.
+ path.toUri.toString
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
new file mode 100644
index 0000000..08cc50d
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.net.URLEncoder
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.{Config, TimestampType}
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal}
+import org.apache.spark.sql.execution.datasources.PartitionDirectory
+import org.apache.spark.sql.types.StringType
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class TestHoodieFileIndex extends HoodieClientTestBase {
+
+ var spark: SparkSession = _
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
+ HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
+ )
+
+ @BeforeEach override def setUp() {
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initTestDataGenerator()
+ initFileSystem()
+ initMetaClient()
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testPartitionSchema(partitionEncode: Boolean): Unit = {
+ val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100)
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+ inputDF1.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath))
+ assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(","))
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array(
+ "org.apache.hudi.keygen.ComplexKeyGenerator",
+ "org.apache.hudi.keygen.SimpleKeyGenerator",
+ "org.apache.hudi.keygen.TimestampBasedKeyGenerator"))
+ def testPartitionSchemaForBuildInKeyGenerator(keyGenerator: String): Unit = {
+ val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100)
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+ inputDF1.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenerator)
+ .option(Config.TIMESTAMP_TYPE_FIELD_PROP, TimestampType.DATE_STRING.name())
+ .option(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy/MM/dd")
+ .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyy-MM-dd")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath))
+ assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(","))
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array(
+ "org.apache.hudi.keygen.CustomKeyGenerator",
+ "org.apache.hudi.keygen.CustomAvroKeyGenerator"))
+ def testPartitionSchemaForCustomKeyGenerator(keyGenerator: String): Unit = {
+ val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100)
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+ inputDF1.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenerator)
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition:simple")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath))
+ assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(","))
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testPartitionPruneWithPartitionEncode(partitionEncode: Boolean): Unit = {
+ val partitions = Array("2021/03/08", "2021/03/09", "2021/03/10", "2021/03/11", "2021/03/12")
+ val newDataGen = new HoodieTestDataGenerator(partitions)
+ val records1 = newDataGen.generateInsertsContainsAllPartitions("000", 100)
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+ inputDF1.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath))
+
+ val partitionFilter1 = EqualTo(attribute("partition"), literal("2021/03/08"))
+ val partitionName = if (partitionEncode) URLEncoder.encode("2021/03/08") else "2021/03/08"
+ val partitionAndFilesAfterPrune = fileIndex.listFiles(Seq(partitionFilter1), Seq.empty)
+ assertEquals(1, partitionAndFilesAfterPrune.size)
+
+ val PartitionDirectory(partitionValues, filesInPartition) = partitionAndFilesAfterPrune(0)
+ assertEquals(partitionValues.toSeq(Seq(StringType)).mkString(","), "2021/03/08")
+ assertEquals(getFileCountInPartitionPath(partitionName), filesInPartition.size)
+
+ val partitionFilter2 = And(
+ GreaterThanOrEqual(attribute("partition"), literal("2021/03/08")),
+ LessThan(attribute("partition"), literal("2021/03/10"))
+ )
+ val prunedPartitions = fileIndex.listFiles(Seq(partitionFilter2),
+ Seq.empty).map(_.values.toSeq(Seq(StringType)).mkString(",")).toList
+
+ assertEquals(List("2021/03/08", "2021/03/09"), prunedPartitions)
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testPartitionPruneWithMultiPartitionColumns(useMetaFileList: Boolean): Unit = {
+ val _spark = spark
+ import _spark.implicits._
+ // Test the case the partition column size is equal to the partition directory level.
+ val inputDF1 = (for (i <- 0 until 10) yield (i, s"a$i", 10 + i, 10000,
+ s"2021-03-0${i % 2 + 1}", "10")).toDF("id", "name", "price", "version", "dt", "hh")
+
+ inputDF1.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(RECORDKEY_FIELD_OPT_KEY, "id")
+ .option(PRECOMBINE_FIELD_OPT_KEY, "version")
+ .option(PARTITIONPATH_FIELD_OPT_KEY, "dt,hh")
+ .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[ComplexKeyGenerator].getName)
+ .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "false")
+ .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, useMetaFileList)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath))
+
+ val partitionFilter1 = And(
+ EqualTo(attribute("dt"), literal("2021-03-01")),
+ EqualTo(attribute("hh"), literal("10"))
+ )
+ val partitionAndFilesAfterPrune = fileIndex.listFiles(Seq(partitionFilter1), Seq.empty)
+ assertEquals(1, partitionAndFilesAfterPrune.size)
+
+ val PartitionDirectory(partitionValues, filesAfterPrune) = partitionAndFilesAfterPrune(0)
+ // The partition prune will work for this case.
+ assertEquals(partitionValues.toSeq(Seq(StringType)).mkString(","), "2021-03-01,10")
+ assertEquals(getFileCountInPartitionPath("2021-03-01/10"), filesAfterPrune.size)
+
+ val readDF1 = spark.read.format("hudi").load(basePath)
+ assertEquals(10, readDF1.count())
+ assertEquals(5, readDF1.filter("dt = '2021-03-01' and hh = '10'").count())
+
+ // Test the case that partition column size not match the partition directory level and
+ // partition column size is > 1. We will not trait it as partitioned table when read.
+ val inputDF2 = (for (i <- 0 until 10) yield (i, s"a$i", 10 + i, 100 * i + 10000,
+ s"2021/03/0${i % 2 + 1}", "10")).toDF("id", "name", "price", "version", "dt", "hh")
+ inputDF2.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(RECORDKEY_FIELD_OPT_KEY, "id")
+ .option(PRECOMBINE_FIELD_OPT_KEY, "version")
+ .option(PARTITIONPATH_FIELD_OPT_KEY, "dt,hh")
+ .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[ComplexKeyGenerator].getName)
+ .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "false")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ fileIndex.refresh()
+ val partitionFilter2 = And(
+ EqualTo(attribute("dt"), literal("2021/03/01")),
+ EqualTo(attribute("hh"), literal("10"))
+ )
+ val partitionAndFilesAfterPrune2 = fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
+
+ assertEquals(1, partitionAndFilesAfterPrune2.size)
+ val PartitionDirectory(partitionValues2, filesAfterPrune2) = partitionAndFilesAfterPrune2(0)
+ // The partition prune would not work for this case, so the partition value it
+ // returns is a InternalRow.empty.
+ assertEquals(partitionValues2, InternalRow.empty)
+ // The returned file size should equal to the whole file size in all the partition paths.
+ assertEquals(getFileCountInPartitionPaths("2021/03/01/10", "2021/03/02/10"),
+ filesAfterPrune2.length)
+ val readDF2 = spark.read.format("hudi").load(basePath)
+
+ assertEquals(10, readDF2.count())
+ // There are 5 rows in the dt = 2021/03/01 and hh = 10
+ assertEquals(5, readDF2.filter("dt = '2021/03/01' and hh ='10'").count())
+ }
+
+ private def attribute(partition: String): AttributeReference = {
+ AttributeReference(partition, StringType, true)()
+ }
+
+ private def literal(value: String): Literal = {
+ Literal.create(value)
+ }
+
+ private def getFileCountInPartitionPath(partitionPath: String): Int = {
+ metaClient.reloadActiveTimeline()
+ val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
+ val fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants)
+ fileSystemView.getAllBaseFiles(partitionPath).iterator().asScala.toSeq.length
+ }
+
+ private def getFileCountInPartitionPaths(partitionPaths: String*): Int = {
+ partitionPaths.map(getFileCountInPartitionPath).sum
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index b671bc6..88ed65f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -18,6 +18,10 @@
package org.apache.hudi.functional
import java.sql.{Date, Timestamp}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieInstant
@@ -38,7 +42,6 @@
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
-import scala.collection.JavaConversions._
/**
* Basic tests on the spark datasource for COW table.
@@ -619,4 +622,51 @@
.load(basePath + "/*")
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0)
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean): Unit = {
+ val N = 20
+ // Test query with partition prune if URL_ENCODE_PARTITIONING_OPT_KEY has enable
+ val records1 = dataGen.generateInsertsContainsAllPartitions("000", N)
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+ inputDF1.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+
+ val countIn20160315 = records1.asScala.count(record => record.getPartitionPath == "2016/03/15")
+ // query the partition by filter
+ val count1 = spark.read.format("hudi")
+ .load(basePath)
+ .filter("partition = '2016/03/15'")
+ .count()
+ assertEquals(countIn20160315, count1)
+
+ // query the partition by path
+ val partitionPath = if (partitionEncode) "2016%2F03%2F15" else "2016/03/15"
+ val count2 = spark.read.format("hudi")
+ .load(basePath + s"/$partitionPath")
+ .count()
+ assertEquals(countIn20160315, count2)
+
+ // Second write with Append mode
+ val records2 = dataGen.generateInsertsContainsAllPartitions("000", N + 1)
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
+ inputDF2.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ // Incremental query without "*" in path
+ val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath)
+ assertEquals(N + 1, hoodieIncViewDF1.count())
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 2a6a0a7..0746f6d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -130,6 +130,10 @@
hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == $updateTimestamp").count())
+ // Read without *
+ val hoodieROViewDF1WithBasePath = spark.read.format("hudi").load(basePath)
+ assertEquals(numRecords, hoodieROViewDF1WithBasePath.count())
+ assertEquals(numRecordsUpdate, hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count())
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false)
}
@@ -149,7 +153,8 @@
.save(srcPath)
// Perform bootstrap
- val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+ val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+ DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -201,11 +206,15 @@
})
// Perform bootstrap
- val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+ val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+ DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
+ // Read without *
+ val hoodieROViewWithBasePathDF1 = spark.read.format("hudi").load(basePath)
+ assertEquals(numRecords, hoodieROViewWithBasePathDF1.count())
// Perform upsert based on the written bootstrap table
val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
@@ -268,7 +277,8 @@
})
// Perform bootstrap
- val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+ DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi")
@@ -304,6 +314,13 @@
.load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF2.count())
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
+ // Test query without "*" for MOR READ_OPTIMIZED
+ val hoodieROViewDFWithBasePath = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
+ DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(basePath)
+ assertEquals(numRecords, hoodieROViewDFWithBasePath.count())
+ assertEquals(numRecordsUpdate, hoodieROViewDFWithBasePath.filter(s"timestamp == $updateTimestamp").count())
}
@Test def testMetadataBootstrapMORPartitioned(): Unit = {
@@ -325,7 +342,8 @@
})
// Perform bootstrap
- val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+ DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi")
@@ -333,6 +351,12 @@
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
+ // Read bootstrapped table without "*"
+ val hoodieROViewDFWithBasePath = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
+ DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(basePath)
+ assertEquals(numRecords, hoodieROViewDFWithBasePath.count())
// Perform upsert based on the written bootstrap table
val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
@@ -420,6 +444,9 @@
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
+ val hoodieROViewDFWithBasePath = spark.read.format("hudi").load(basePath)
+ assertEquals(numRecords, hoodieROViewDFWithBasePath.count())
+
// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
@@ -445,13 +472,15 @@
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = false)
}
- def runMetadataBootstrapAndVerifyCommit(tableType: String): String = {
+ def runMetadataBootstrapAndVerifyCommit(tableType: String,
+ partitionColumns: Option[String] = None): String = {
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType)
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionColumns.getOrElse(""))
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName)
.mode(SaveMode.Overwrite)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 92024a3..00c40abf1 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -17,6 +17,7 @@
package org.apache.hudi.functional
+import scala.collection.JavaConverters._
import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.DefaultHoodieRecordPayload
@@ -31,6 +32,8 @@
import org.apache.spark.sql.functions._
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.collection.JavaConversions._
@@ -562,4 +565,53 @@
df.show(1)
df.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", "tip_history").show(1)
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testQueryMORWithBasePathAndFileIndex(partitionEncode: Boolean): Unit = {
+ val N = 20
+ // Test query with partition prune if URL_ENCODE_PARTITIONING_OPT_KEY has enable
+ val records1 = dataGen.generateInsertsContainsAllPartitions("000", N)
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+ inputDF1.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+
+ val countIn20160315 = records1.asScala.count(record => record.getPartitionPath == "2016/03/15")
+ // query the partition by filter
+ val count1 = spark.read.format("hudi")
+ .load(basePath)
+ .filter("partition = '2016/03/15'")
+ .count()
+ assertEquals(countIn20160315, count1)
+
+ // query the partition by path
+ val partitionPath = if (partitionEncode) "2016%2F03%2F15" else "2016/03/15"
+ val count2 = spark.read.format("hudi")
+ .load(basePath + s"/$partitionPath")
+ .count()
+ assertEquals(countIn20160315, count2)
+
+ // Second write with Append mode
+ val records2 = dataGen.generateInsertsContainsAllPartitions("000", N + 1)
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
+ inputDF2.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ // Incremental query without "*" in path
+ val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath)
+ assertEquals(N + 1, hoodieIncViewDF1.count())
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala
new file mode 100644
index 0000000..5bf0284
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import java.util.TimeZone
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.execution.datasources.PartitioningUtils.PartitionValues
+import org.apache.spark.sql.types.DataType
+
+class Spark2ParsePartitionUtil extends SparkParsePartitionUtil {
+ override def parsePartition(path: Path, typeInference: Boolean,
+ basePaths: Set[Path],
+ userSpecifiedDataTypes: Map[String, DataType],
+ timeZone: TimeZone): Option[PartitionValues] = {
+ PartitioningUtils.parsePartition(path, typeInference,
+ basePaths, userSpecifiedDataTypes, timeZone)._1
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
new file mode 100644
index 0000000..ea9cc788
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import java.util.TimeZone
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
+import org.apache.spark.sql.execution.datasources.PartitioningUtils.{PartitionValues, timestampPartitionPattern}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil {
+
+ override def parsePartition(path: Path, typeInference: Boolean,
+ basePaths: Set[Path], userSpecifiedDataTypes: Map[String, DataType],
+ timeZone: TimeZone): Option[PartitionValues] = {
+ val dateFormatter = DateFormatter(timeZone.toZoneId)
+ val timestampFormatter = TimestampFormatter(timestampPartitionPattern,
+ timeZone.toZoneId, isParsing = true)
+
+ PartitioningUtils.parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes,
+ conf.validatePartitionColumns, timeZone.toZoneId, dateFormatter, timestampFormatter)._1
+ }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 336639c..01a374d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -20,6 +20,7 @@
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.HoodieWriterUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -235,12 +236,15 @@
}
} else {
this.commitTimelineOpt = Option.empty();
+ String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator);
+
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder("archived")
.setPayloadClassName(cfg.payloadClassName)
.setBaseFileFormat(cfg.baseFileFormat)
+ .setPartitionColumns(partitionColumns)
.initTable(new Configuration(jssc.hadoopConfiguration()),
cfg.targetBasePath);
}
@@ -326,12 +330,14 @@
}
}
} else {
+ String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder("archived")
.setPayloadClassName(cfg.payloadClassName)
.setBaseFileFormat(cfg.baseFileFormat)
+ .setPartitionColumns(partitionColumns)
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
}