JIRA: https://issues.apache.org/jira/browse/HUDI-3478
The Change-Data-Capture (CDC) feature enables Hudi to show how records were changed by producing the changes and therefore to handle CDC query usecases.
In cases where Hudi tables used as streaming sources, we want to be aware of all records' changes in one commit exactly, as in which records were inserted, deleted, and updated. And for updated records, the old values before update and the new ones after.
To implement this feature, we need to implement the logic on the write and read path to let Hudi figure out the changed data when read. In some cases, we need to write extra data to help optimize CDC queries.
Here is a simple case to explain the CDC.
We follow the debezium output format: four columns as shown below
op
column has three enum values:
insert
; when op
is i
, before
is always null;update
; when op
is u
, both before
and after
don't be null;delete
; when op
is d
, after
is always null;Note: the illustration here ignores all the Hudi metadata columns like _hoodie_commit_time
in before
and after
columns.
Note: Table operations like Compact
, Clean
, Index
do not write/change any data. So we don't need to consider them in CDC scenario.
Define a new config:
key | default | description |
---|---|---|
hoodie.table.cdc.enabled | false | true represents the table to be used for CDC queries and will write cdc data if needed. |
hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including ‘before’ and ‘after’. Otherwise, just persist the ‘op’ and the record key. |
Other existing config that can be reused in cdc mode is as following: Define another query mode named cdc
, which is similar to snapshpt
, read_optimized
and incremental
. When read in cdc mode, set hoodie.datasource.query.type
to cdc
.
key | default | description |
---|---|---|
hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
hoodie.datasource.read.start.timestamp | - | requried. |
hoodie.datasource.read.end.timestamp | - | optional. |
Here we define 5 cdc file types in CDC scenario.
hoodie.table.cdc.supplemental.logging
is true, it keeps all the fields about the change data, including op
, ts_ms
, before
and after
. When query hudi table in cdc query mode, load this file and return directly.hoodie.table.cdc.supplemental.logging
is false, it just keeps the op
and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like before
and after
on the fly.after
, and the value of op
of each record is i
.before
, and the value of op
of each record is d
.before
and after
.DELETE_PARTITION
and INSERT_OVERWRITE
operations. We load this file group, treat all the records as the value of before
, and the value of op
of each record is d
.Note:
CDC_LOG_File
is a new file type and written out by CDC. The ADD_BASE_File
, REMOVE_BASE_FILE
, MOR_LOG_FILE
and REPLACED_FILE_GROUP
are just representations of the existing data files in the CDC scenario. For some examples:INSERT
operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;DELETE_PARTITION
operation will replace a list of file slice. For each of these, we get the cdc data in the REPLACED_FILE_GROUP
way.The idea is to Write CDC files as little as possible, and reuse data files as much as possible.
Hudi writes data by HoodieWriteHandle
. We notice that only HoodieMergeHandle
and it's subclasses will receive both the old record and the new-coming record at the same time, merge and write. So we will add a LogFormatWriter
in these classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist of CDCBlock
. The CDC log file will be placed in the same position as the base files and other log files, so that the clean service can clean up them without extra work. The file structure is like:
hudi_cdc_table/ .hoodie/ hoodie.properties 00001.commit 00002.replacecommit ... year=2021/ filegroup1-instant1.parquet .filegroup1-instant1.log year=2022/ filegroup2-instant1.parquet .filegroup2-instant1.log ...
Under a partition directory, the .log
file with CDCBlock
above will keep the changing data we have to materialize.
There is an option to control what data is written to CDCBlock
, that is hoodie.table.cdc.supplemental.logging
. See the description of this config above.
Spark DataSource example:
df.write.format("hudi"). options(commonOpts) option("hoodie.table.cdc.enabled", "true"). option("hoodie.table.cdc.supplemental.logging", "true"). //enable cdc supplemental logging // option("hoodie.table.cdc.supplemental.logging", "false"). //disable cdc supplemental logging save("/path/to/hudi")
Spark SQL example:
-- create a hudi table that enable cdc create table hudi_cdc_table ( id int, name string, price double, ts long ) using hudi tblproperties ( 'primaryKey' = 'id', 'preCombineField' = 'ts', 'hoodie.table.cdc.enabled' = 'true', 'hoodie.table.cdc.supplemental.logging' = 'true', 'type' = 'cow' )
This part just discuss how to make Spark (including Spark DataFrame, SQL, Streaming) to read the Hudi CDC data.
Implement CDCReader
that do these steps to response the CDC request:
hoodie.table.cdc.enabled
, and the query range is valid.ActiveTimeline
.DataFrame
.class CDCReader( metaClient: HoodieTableMetaClient, options: Map[String, String], ) extends BaseRelation with PrunedFilteredScan { override def schema: StructType = { // 'op', 'source', 'ts_ms', 'before', 'after' } override def buildScan( requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { // ... } }
Note:
CDCReader
manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in CDCReader
.hoodie.table.cdc.supplemental.logging
is false, we need to do more work to get the change data. The following illustration explains the difference when this config is true or false.Reading COW table in CDC query mode is equivalent to reading a simplified MOR table that has no normal log files.
According to the design of the writing part, only the cases where writing mor tables will write out the base file (which call the HoodieMergeHandle
and it's subclasses) will write out the cdc files. In other words, cdc files will be written out only for the index and file size reasons.
Here use an illustration to explain how we can query the CDC on MOR table in kinds of cases.
Spark DataSource
spark.read.format("hudi"). option("hoodie.datasource.query.type", "cdc"). option("hoodie.datasource.read.begin.instanttime", "20220426103000000"). option("hoodie.datasource.read.end.instanttime", "20220426113000000"). load("/path/to/hudi")
Spark SQL
-- query the CDC data between 20220426103000000 and 20220426113000000; select * from hudi_table_changes("hudi_cdc_table", "20220426103000000", "20220426113000000"); -- query the CDC data since 20220426103000000; select * from hudi_table_changes("hudi_cdc_table", "20220426103000000");
Spark Streaming
val df = spark.readStream.format("hudi"). option("hoodie.datasource.query.type", "cdc"). load("/path/to/hudi") // launch a streaming which starts from the current snapshot of the hudi table, // and output at the console. val stream = df.writeStream.format("console").start
The CDC feature can be enabled by the corresponding configuration, which is default false. Using this feature dos not depend on Spark versions.