JIRA: https://issues.apache.org/jira/browse/HUDI-3478
The Change-Data-Capture (CDC) feature enables Hudi to show how records were changed by producing the changes and therefore to handle CDC query usecases.
In cases where Hudi tables used as streaming sources, we want to be aware of all records' changes in one commit exactly, as in which records were inserted, deleted, and updated. And for updated records, the old values before update and the new ones after.
To implement this feature, we need to implement the logic on the write and read path to let Hudi figure out the changed data when read. In some cases, we need to write extra data to help optimize CDC queries.
The diagram below illustrates a typical CDC scenario.
We follow the debezium output format: four columns as shown below
op
column has three enum values:
insert
; when op
is i
, before
is always null;update
; when op
is u
, both before
and after
don't be null;delete
; when op
is d
, after
is always null;Note
_hoodie_commit_time
in before
and after
columns.key | default | description |
---|---|---|
hoodie.table.cdc.enabled | false | The master switch of the CDC features. If true , writers and readers will respect CDC configurations and behave accordingly. |
hoodie.table.cdc.supplemental.logging.mode | KEY_OP | A mode to indicate the level of changed data being persisted. At the minimum level, KEY_OP indicates changed records' keys and operations to be persisted. DATA_BEFORE : persist records' before-images in addition to KEY_OP . DATA_BEFORE_AFTER : persist records' after-images in addition to DATA_BEFORE . |
To perform CDC queries, users need to set hoodie.datasource.query.incremental.format=cdc
and hoodie.datasource.query.type=incremental
.
key | default | description |
---|---|---|
hoodie.datasource.query.type | snapshot | set to incremental for incremental query. |
hoodie.datasource.query.incremental.format | latest_state | latest_state (current incremental query behavior) returns the latest records' values. Set to cdc to return the full CDC results. |
hoodie.datasource.read.begin.instanttime | - | requried. |
hoodie.datasource.read.end.instanttime | - | optional. |
supplemental.logging.mode=KEY_OP
In this mode, we minimized the additional storage for CDC information.
op
s and record keys are persisted.op
s and record keys are available, inference using current and previous committed data will be optimized by reducing IO cost of reading previous committed data, i.e., only read changed records.The detailed logical flows for write and read scenarios are the same regardless of logging.mode
, which will be illustrated in the section below.
supplemental.logging.mode=DATA_BEFORE
or DATA_BEFORE_AFTER
Overall logic flows are illustrated below.
Hudi writes data by HoodieWriteHandle
. We notice that only HoodieMergeHandle
and its subclasses will receive both the old record and the new-coming record at the same time, merge and write. So we will add a LogFormatWriter
in these classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist of CDCBlock
. The CDC log files have -cdc
suffix (to distinguish from data log files for read performance consideration) will be placed in the same paths as the base files and other log files. Clean service needs to be tweaked accordingly. An example of the file structure:
cow_table/ .hoodie/ hoodie.properties 00001.commit 00002.replacecommit ... year=2021/ filegroup1-instant1.parquet .filegroup1-instant1-cdc year=2022/ filegroup2-instant1.parquet .filegroup2-instant1-cdc ...
Under partition directories, the -cdc
files with CDCBlock
as shown above contain the persisted changed data.
2 design choices on when to persist CDC in MOR tables:
Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket indexing, op
(I/U/D) data is not available at indexing.
Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines, however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or scheduling more frequent compaction can be used to minimize the latency.
The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
op=I
will be persistedop=U|D
will be persistedop=I|U|D
will be persistedIn summary, we propose that CDC data should be persisted synchronously upon base files generation. It is therefore write-on-indexing for Spark inserts (non-bucket index) and write-on-compaction for everything else.
ChangeTrackingService
, which can be scheduled to fine-tune the CDC Availability SLA, effectively decoupling it with Compaction frequency.Spark DataSource:
df.write.format("hudi"). options(commonOpts) option("hoodie.table.cdc.enabled", "true"). option("hoodie.table.cdc.supplemental.logging.mode", "DATA_AFTER"). save("/path/to/hudi")
Spark SQL:
-- create a hudi table that enable cdc create table hudi_cdc_table ( id int, name string, price double, ts long ) using hudi tblproperties ( 'primaryKey' = 'id', 'preCombineField' = 'ts_ms', 'hoodie.table.cdc.enabled' = 'true', 'hoodie.table.cdc.supplemental.logging.mode' = 'DATA_AFTER', 'type' = 'cow' )
HoodieCDCInferenceCase | Infer case details | Infer logic | Note |
---|---|---|---|
AS_IS | CDC file written (suffix contains -cdc ) alongside base files (COW) or log files (MOR) | CDC info will be extracted as is | the read-optimized way to read CDC |
BASE_FILE_INSERT | Base files were written to a new file group | All records (in the current commit): op=I , before=null , after=<current value> | on-the-fly inference |
BASE_FILE_DELETE | Records are not found from the current commit‘s base file but found in the previous commit’s within the same file group | All records (in the previous commit): op=D , before=<previous value> , after=null | on-the-fly inference |
LOG_FILE | For MOR, log files to be read and records to be looked up in the previous file slice. | Current record read from delete block, found in previous file slice => op=D , before=<previous value> , after=null | on-the-fly inference |
LOG_FILE | ditto | Current record read from delete block, not found in previous file slice => skip due to the delete log block should be discarded (trying to delete non-existing records) | on-the-fly inference |
LOG_FILE | ditto | Current record not read from delete block, found in previous file slice => op=U , before=<previous value> , after=<current value> | on-the-fly inference |
LOG_FILE | ditto | Current record not read from delete block, not found in previous file slice => op=I , before=null , after=<current value> | on-the-fly inference |
REPLACE_COMMIT | File group corresponds to a replace commit | All records op=D , before=<value from the file group> , after=null | on-the-fly inference |
This section uses Spark (incl. Spark DataFrame, SQL, Streaming) as an example to perform CDC-format incremental queries.
Implement CDCReader
that do these steps to response the CDC request:
hoodie.table.cdc.enabled=true
, and if the query range is valid.ActiveTimeline
.DataFrame
.class CDCReader( metaClient: HoodieTableMetaClient, options: Map[String, String], ) extends BaseRelation with PrunedFilteredScan { override def schema: StructType = { // 'op', 'source', 'ts_ms', 'before', 'after' } override def buildScan( requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { // ... } }
Note:
CDCReader
manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in CDCReader
.hoodie.table.cdc.supplemental.logging.mode=KEY_OP
, we need to compute the changed data. The following illustrates the difference.CDC queries always extract and return the persisted CDC data.
According to the section “Persisting CDC in MOR”, persisted CDC data is available upon base files' generation. The CDC-type query results should be consisted of on-the-fly read results and as-is read results. See the Read section.
The implementation should
Here use an illustration to explain how we can query the CDC on MOR table in kinds of cases.
Spark DataSource
spark.read.format("hudi"). option("hoodie.datasource.query.type", "incremental"). option("hoodie.datasource.query.incremental.format", "cdc"). option("hoodie.datasource.read.begin.instanttime", "20220426103000000"). option("hoodie.datasource.read.end.instanttime", "20220426113000000"). load("/path/to/hudi")
Spark SQL
-- query the CDC data between 20220426103000000 and 20220426113000000; select * from hudi_table_changes("hudi_cdc_table", "20220426103000000", "20220426113000000"); -- query the CDC data since 20220426103000000; select * from hudi_table_changes("hudi_cdc_table", "20220426103000000");
Spark Streaming
val df = spark.readStream.format("hudi"). option("hoodie.datasource.query.type", "incremental"). load("/path/to/hudi") // launch a streaming which starts from the current snapshot of the hudi table, // and output at the console. val stream = df.writeStream.format("console").start
Spark support phase 1
OP=I
when write inserts to base files) and CDC on-the-fly inferring read.Spark support phase 2
OP=U/D
when compact updates/deletes to log files) and CDC read to combine on-the-fly inferred data and persisted CDC data.HoodieMergedLogRecordScanner
needs to support producing CDC data for each version of the changed records. HoodieCompactor
and HoodieMergeHandler
are to adapt the changes. See HUDI-4705.Flink support can be developed in parallel, and can use of the common logical changes of CDC write via compaction in Spark support phase 2.
For supplemental.logging=DATA_BEFORE
or DATA_AFTER