blob: 1172b657f64c6c83d7c75bb1335744417a426253 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hudi.functional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocatedFileStatus, Path}
import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.api.condition.DisabledIf
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource}
import java.math.BigInteger
import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
import scala.util.Random
@DisabledIf(value = "org.apache.hudi.HoodieSparkUtils#gteqSpark3_3",
disabledReason = "Jackson version conflicts (HUDI-5352)")
class TestColumnStatsIndex extends HoodieClientTestBase {
var spark: SparkSession = _
val sourceTableSchema =
new StructType()
.add("c1", IntegerType)
.add("c2", StringType)
.add("c3", DecimalType(9,3))
.add("c4", TimestampType)
.add("c5", ShortType)
.add("c6", DateType)
.add("c7", BinaryType)
.add("c8", ByteType)
override def setUp() {
spark = sqlContext.sparkSession
override def tearDown() = {
def testMetadataColumnStatsIndex(testCase: ColumnStatsTestCase): Unit = {
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
RECORDKEY_FIELD.key -> "c1",
// NOTE: Currently only this setting is used like following by different MT partitions:
// - Files: using it
// - Column Stats: NOT using it (defaults to doing "point-lookups")
HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> testCase.forceFullLogScan.toString,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
) ++ metadataOpts
doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
dataSourcePath = "index/colstats/input-table-json",
expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json",
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite)
doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
dataSourcePath = "index/colstats/another-input-table-json",
expectedColStatsSourcePath = "index/colstats/updated-column-stats-index-table.json",
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
// NOTE: MOR and COW have different fixtures since MOR is bearing delta-log files (holding
// deferred updates), diverging from COW
val expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) {
} else {
doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
def testMetadataColumnStatsIndexValueCount(tableType: HoodieTableType): Unit = {
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
DataSourceWriteOptions.TABLE_TYPE.key ->,
RECORDKEY_FIELD.key -> "c1",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
) ++ metadataOpts
val schema = StructType(StructField("c1", IntegerType, false) :: StructField("c2", StringType, true) :: Nil)
val inputDF = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(1, "v1"), Row(2, "v2"), Row(3, null), Row(4, "v4"))),
.sort("c1", "c2")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
metaClient = HoodieTableMetaClient.reload(metaClient)
val metadataConfig = HoodieMetadataConfig.newBuilder()
val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
columnStatsIndex.loadTransposed(Seq("c2"), false) { transposedDF =>
val result ="valueCount", "c2_nullCount")
assertTrue(result.getLong(0) == 4)
assertTrue(result.getLong(1) == 1)
@ValueSource(booleans = Array(true, false))
def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory: Boolean): Unit = {
val targetColumnsToIndex = Seq("c1", "c2", "c3")
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> targetColumnsToIndex.mkString(",")
val opts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
RECORDKEY_FIELD.key -> "c1",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
) ++ metadataOpts
val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
// NOTE: Schema here is provided for validation that the input date is in the appropriate format
val inputDF =
.repartition(4, new Column("c1"))
.option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
metaClient = HoodieTableMetaClient.reload(metaClient)
val metadataConfig = HoodieMetadataConfig.newBuilder()
// Case #1: Empty CSI projection
// Projection is requested for columns which are NOT indexed
// by the CSI
// These are NOT indexed
val requestedColumns = Seq("c4")
val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { emptyTransposedColStatsDF =>
assertEquals(0, emptyTransposedColStatsDF.collect().length)
// Case #2: Partial CSI projection
// Projection is requested for set of columns some of which are
// NOT indexed by the CSI
// We have to include "c1", since we sort the expected outputs by this column
val requestedColumns = Seq("c4", "c1")
val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema)
// Match against expected column stats table
val expectedColStatsIndexTableDf =
// Collect Column Stats manually (reading individual Parquet files)
val manualColStatsTableDF =
buildColumnStatsTableManually(basePath, requestedColumns, targetColumnsToIndex, expectedColStatsSchema)
val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { partialTransposedColStatsDF =>
assertEquals(expectedColStatsIndexTableDf.schema, partialTransposedColStatsDF.schema)
// NOTE: We have to drop the `fileName` column as it contains semi-random components
// that we can't control in this test. Nevertheless, since we manually verify composition of the
// ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(partialTransposedColStatsDF.drop("fileName"))))
assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(partialTransposedColStatsDF)))
// Case #3: Aligned CSI projection
// Projection is requested for set of columns some of which are
// indexed only for subset of files
// NOTE: The update we're writing is intentionally omitting some of the columns
// present in an earlier source
val missingCols = Seq("c2", "c3")
val partialSourceTableSchema = StructType(sourceTableSchema.fields.filterNot(f => missingCols.contains(
val updateJSONTablePath = getClass.getClassLoader.getResource("index/colstats/partial-another-input-table-json").toString
val updateDF =
.option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
metaClient = HoodieTableMetaClient.reload(metaClient)
val requestedColumns = sourceTableSchema.fieldNames
val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema)
val expectedColStatsIndexUpdatedDF =
// Collect Column Stats manually (reading individual Parquet files)
val manualUpdatedColStatsTableDF =
buildColumnStatsTableManually(basePath, requestedColumns, targetColumnsToIndex, expectedColStatsSchema)
val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
// Nevertheless, the last update was written with a new schema (that is a subset of the original table schema),
// we should be able to read CSI, which will be properly padded (with nulls) after transposition
columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { transposedUpdatedColStatsDF =>
assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
def testParquetMetadataRangeExtraction(): Unit = {
val df = generateRandomDataFrame(spark)
val pathStr = tempDir.resolve("min-max").toAbsolutePath.toString
val utils = new ParquetUtils
val conf = new Configuration()
val path = new Path(pathStr)
val fs = path.getFileSystem(conf)
val parquetFilePath = fs.listStatus(path).filter(fs => fs.getPath.getName.endsWith(".parquet")).toSeq.head.getPath
val ranges = utils.readRangeFromParquetMetadata(conf, parquetFilePath,
Seq("c1", "c2", "c3a", "c3b", "c3c", "c4", "c5", "c6", "c7", "c8").asJava)
ranges.asScala.foreach(r => {
// NOTE: Unfortunately Parquet can't compute statistics for Timestamp column, hence we
// skip it in our assertions
if (r.getColumnName.equals("c4")) {
// scalastyle:off return
// scalastyle:on return
val min = r.getMinValue
val max = r.getMaxValue
assertTrue(r.getMinValue.asInstanceOf[Comparable[Object]].compareTo(r.getMaxValue.asInstanceOf[Object]) <= 0)
private def doWriteAndValidateColumnStats(testCase: ColumnStatsTestCase,
metadataOpts: Map[String, String],
hudiOpts: Map[String, String],
dataSourcePath: String,
expectedColStatsSourcePath: String,
operation: String,
saveMode: SaveMode): Unit = {
val sourceJSONTablePath = getClass.getClassLoader.getResource(dataSourcePath).toString
// NOTE: Schema here is provided for validation that the input date is in the appropriate format
val inputDF =
.repartition(4, new Column("c1"))
.option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
.option(DataSourceWriteOptions.OPERATION.key, operation)
metaClient = HoodieTableMetaClient.reload(metaClient)
// Currently, routine manually validating the column stats (by actually reading every column of every file)
// only supports parquet files. Therefore we skip such validation when delta-log files are present, and only
// validate in following cases: (1) COW: all operations; (2) MOR: insert only.
val shouldValidateColumnStatsManually = testCase.tableType == HoodieTableType.COPY_ON_WRITE ||
testCase, metadataOpts, expectedColStatsSourcePath, shouldValidateColumnStatsManually)
private def buildColumnStatsTableManually(tablePath: String,
includedCols: Seq[String],
indexedCols: Seq[String],
indexSchema: StructType): DataFrame = {
val files = {
val it = fs.listFiles(new Path(tablePath), true)
var seq = Seq[LocatedFileStatus]()
while (it.hasNext) {
seq = seq :+
seq.filter(fs => fs.getPath.getName.endsWith(".parquet"))
files.flatMap(file => {
val df =
val exprs: Seq[String] =
s"'${typedLit(file.getPath.getName)}' AS file" +:
s"sum(1) AS valueCount" +:
.filter(col => includedCols.contains(col))
.flatMap(col => {
val minColName = s"${col}_minValue"
val maxColName = s"${col}_maxValue"
if (indexedCols.contains(col)) {
s"min($col) AS $minColName",
s"max($col) AS $maxColName",
s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount"
} else {
s"null AS $minColName",
s"null AS $maxColName",
s"null AS ${col}_nullCount"
df.selectExpr(exprs: _*)
private def validateColumnStatsIndex(testCase: ColumnStatsTestCase,
metadataOpts: Map[String, String],
expectedColStatsSourcePath: String,
validateColumnStatsManually: Boolean): Unit = {
val metadataConfig = HoodieMetadataConfig.newBuilder()
val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
val validationSortColumns = Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue")
columnStatsIndex.loadTransposed(sourceTableSchema.fieldNames, testCase.shouldReadInMemory) { transposedColStatsDF =>
// Match against expected column stats table
val expectedColStatsIndexTableDf =
assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema)
// NOTE: We have to drop the `fileName` column as it contains semi-random components
// that we can't control in this test. Nevertheless, since we manually verify composition of the
// ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
assertEquals(asJson(sort(expectedColStatsIndexTableDf, validationSortColumns)),
asJson(sort(transposedColStatsDF.drop("fileName"), validationSortColumns)))
if (validateColumnStatsManually) {
// TODO(HUDI-4557): support validation of column stats of avro log files
// Collect Column Stats manually (reading individual Parquet files)
val manualColStatsTableDF =
buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, sourceTableSchema.fieldNames, expectedColStatsSchema)
assertEquals(asJson(sort(manualColStatsTableDF, validationSortColumns)),
asJson(sort(transposedColStatsDF, validationSortColumns)))
private def generateRandomDataFrame(spark: SparkSession): DataFrame = {
val sourceTableSchema =
new StructType()
.add("c1", IntegerType)
.add("c2", StringType)
// NOTE: We're testing different values for precision of the decimal to make sure
// we execute paths bearing different underlying representations in Parquet
// REF:
.add("c3a", DecimalType(9, 3))
.add("c3b", DecimalType(10, 3))
.add("c3c", DecimalType(20, 3))
.add("c4", TimestampType)
.add("c5", ShortType)
.add("c6", DateType)
.add("c7", BinaryType)
.add("c8", ByteType)
val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item =>
val c1 = Integer.valueOf(item)
val c2 = Random.nextString(10)
val c3a = java.math.BigDecimal.valueOf(Random.nextInt() % (1 << 24), 3)
val c3b = java.math.BigDecimal.valueOf(Random.nextLong() % (1L << 32), 3)
// NOTE: We cap it at 2^64 to make sure we're not exceeding target decimal's range
val c3c = new java.math.BigDecimal(new BigInteger(64, new java.util.Random()), 3)
val c4 = new Timestamp(System.currentTimeMillis())
val c5 = java.lang.Short.valueOf(s"${(item + 16) / 10}")
val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")
val c7 = Array(item).map(_.toByte)
val c8 = java.lang.Byte.valueOf("9")
RowFactory.create(c1, c2, c3a, c3b, c3c, c4, c5, c6, c7, c8)
spark.createDataFrame(rdd, sourceTableSchema)
private def asJson(df: DataFrame) =
private def sort(df: DataFrame): DataFrame = {
sort(df, Seq("c1_maxValue", "c1_minValue"))
private def sort(df: DataFrame, sortColumns: Seq[String]): DataFrame = {
val sortedCols = df.columns.sorted
// Sort dataset by specified columns (to minimize non-determinism in case multiple files have the same
// value of the first column), sortedCols.tail: _*)
.sort(sortColumns.head, sortColumns.tail: _*)
object TestColumnStatsIndex {
case class ColumnStatsTestCase(tableType: HoodieTableType, forceFullLogScan: Boolean, shouldReadInMemory: Boolean)
def testMetadataColumnStatsIndexParams:[Arguments] = { =>
Seq(Arguments.arguments(ColumnStatsTestCase(tableType, forceFullLogScan = false, shouldReadInMemory = true)),
Arguments.arguments(ColumnStatsTestCase(tableType, forceFullLogScan = false, shouldReadInMemory = false)),
Arguments.arguments(ColumnStatsTestCase(tableType, forceFullLogScan = true, shouldReadInMemory = false)),
Arguments.arguments(ColumnStatsTestCase(tableType, forceFullLogScan = true, shouldReadInMemory = true)))
): _*)