RFC-38: Spark Datasource V2 Integration

Proposers

  • @leesf

Approvers

  • @vinothchandar
  • @xishiyan
  • @YannByron

Status

JIRA: https://issues.apache.org/jira/browse/HUDI-1297

Abstract

Today, Hudi still uses V1 api and relies heavily on RDD api to index, repartition and so on given the flexibility of RDD api, it works fine in v1 api, using datasource V1 api, Hudi provides complete read/write, update, and small file auto handling capabilities, all things work well. However, with the continuous development and evolving of datasource V2 api, the datasource v2 api has stabilized.Taking into account the datasource v1 api is too old and the spark community no longer spends more resources to maintain v1 api, so consider migrating to DataSource V2 api, and use more pushdown filters provided by V2 api and integrate with RFC-27 to provide more powerful query capabilities. Also we could leverage it after V2 api get evolved or optimized again.

Background

The current Hudi read and write paths use DataSource V1 api, and the implementation class is DefaultSource

/**
* Hoodie Spark Datasource, for reading and writing hoodie tables
*
*/
class DefaultSource extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider
with DataSourceRegister
with StreamSinkProvider
with StreamSourceProvider
with Serializable {
...
}

As for writing(batch write), the following method will be called.

override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)

    if (translatedOptions(OPERATION.key).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
      HoodieSparkSqlWriter.bootstrap(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
    } else {
      HoodieSparkSqlWriter.write(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
    }
    new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}

Regarding querying, the following method will return a BaseRelation(if not provide schema)

override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, null)
}

For streaming writing and reading, DefaultSource#createSink and DefaultSource#createSource are called respectively. In 0.9.0 version , the bulk_insert row mode was introduced to speed up bulk_insert, which implements the SupportsWrite v2 api and uses HoodieDataSourceInternalTable for writing, right now only bulk_insert operation is supported.

Implementation

Spark provides a complete V2 api, such as CatalogPlugin, SupportsWrite, SupportsRead, and various pushdown filters,
such as SupportsPushDownFilters, SupportsPushDownAggregates, SupportsPushDownRequiredColumns

We would define the key abstraction of call HoodieInternalV2Table, which inherits the Table, SupportsWrite, SupportsRead interfaces to provide writing and reading capabilities.

Writing Path

Hudi relies heavily on some RDD APIs on write path, such as the indexing to determine where the record is update or insert, this refactoring work is relatively large or impossible to migrate to v2 write path under datasource v2 api. So we can fallback to write to v1 since Spark provides the V1Write interface to bridge the V1 and V2 api in 3.2.0

The writing path code snippet is below

class HoodieInternalV2Table extends Table with SupportsWrite with V2TableWithV1Fallback {

  override def name(): String = {
    //
  }

  override def schema(): StructType = {
    // get hudi table schema
  }

  override def partitioning(): Array[Transform] = {
    // get partitioning of hudi table.
  }

  override def capabilities(): Set[TableCapability] = {
    // Set(BATCH_WRITE, BATCH_READ,TRUNCATE,...)
  }

  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
    // HoodieV1WriteBuilder
  }
}

The definition of HoodieV1WriteBuilder shows below.

private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
                                     hoodieCatalogTable: HoodieCatalogTable,
                                     spark: SparkSession)
  extends SupportsTruncate with SupportsOverwrite with ProvidesHoodieConfig {

  override def truncate(): HoodieV1WriteBuilder = {
    this
  }

  override def overwrite(filters: Array[Filter]): WriteBuilder = {
    this
  }

  override def build(): V1Write = new V1Write {
    override def toInsertableRelation: InsertableRelation = {
      //IntertableRelation
    }
  }
}

Querying path

For v2 querying, Spark provides various pushdown filters, such as SupportsPushDownFilters, SupportsPushDownAggregates, SupportsPushDownRequiredColumns, SupportsRuntimeFiltering and so on, which is more clear and flexible than v1 interface. Also, v2 interface provides the capability to read the columnar format file such as parquet and orc format file, one more thing is that v2 interface provides the capability to split and define the number of partitions for users, which provides the possibility to split more accurate splits and accelerate query speed on Hudi side. However, for querying, in first stage we also fallback to v1 read path, which means we need convert DataSourceV2Relation to DefaultSource in analysis stage to make the changes well controlled. The code snippet shows below, the HoodieSpark3Analysis should be injected if spark version is equal or larger than 3.2.0.


case class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[LogicalPlan] with SparkAdapterSupport with ProvidesHoodieConfig { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { case dsv2@DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) => val output = dsv2.output val catalogTable = if (d.catalogTable.isDefined) { Some(d.v1Table) } else { None } val relation = new DefaultSource().createRelation(new SQLContext(sparkSession), buildHoodieConfig(d.hoodieCatalogTable)) LogicalRelation(relation, output, catalogTable, isStreaming = false) } }

In the second stage, we would make use of v2 reading interface and define HoodieBatchScanBuilder to provide querying capability. The workflow of querying process is shown in below figure.
PartitionReaderFactory located in the Driver and the PartitionReader located in the Executor.

The querying path code sample is below

class HoodieBatchScanBuilder extends ScanBuilder with SupportsPushDownFilters with SupportsPushDownRequiredColumns {
override def build(): Scan = {
// HoodieScan
}

override def pushFilters(filters: Array[Filter]): Array[Filter] = {
// record the filters
}

override def pushedFilters(): Array[Filter] = {
// pushed filters
}

override def pruneColumns(requiredSchema: StructType): Unit = {
// record the pruned columns
}
}

Table Meta Management

Implementing the CatalogPlugin interface to manage the metadata of the Hudi table and define the core abstraction called HoodieCatalog, and the code sample is below.

class HoodieCatalog extends DelegatingCatalogExtension
with StagingTableCatalog {
  override def loadTable(ident: Identifier): Table = {
    // HoodieDatasouceTable
  }

  override def createTable(ident: Identifier, 
                           schema: StructType, 
                           partitions: Array[Transform], 
                           properties: util.Map[String, String]): Table = {
      // create hudi table
  }
    
  override def dropTable(Identifier ident): Boolean = {
    // drop hudi table
  }
    
  override def alterTable(Identifier ident, TableChange... changes): Table = {
    // check schema compability
    // HoodieDatasouceTable
  }
    
  override def stageReplace(ident: Identifier, 
                            schema: StructType, 
                            partitions: Array[Transform], 
                            properties: util.Map[String, String]): StagedTable = {
    // StagedHoodieTable
  }
    
  override def stageCreateOrReplace(ident: Identifier, 
                                    schema: StructType, 
                                    partitions: Array[Transform], 
                                    properties: util.Map[String, String]): StagedTable = {
    // StagedHoodieTable
  }
} 

Users would set the spark session config spark.sql.catalog.spark_catalog to org.apache.hudi.catalog.HoodieCatalog to load the HoodieCatalogto manage hudi tables.

Rollout/Adoption Plan

  • What impact (if any) will there be on existing users?

there is no impact on existing users, but users would specify the new catalog to manager hudi tables or other tables.

  • If we are changing behavior how will we phase out the older behavior?

we should keep compatibility of v1 version and make it transparent for users to migrate to v2 api.

Test Plan

[ ] PoC for catalog plugin [ ] PoC for writing path with UTs [ ] Poc for querying path with UTs [ ] E2E tests [ ] Benchmark for v1 and v2 writing and querying