JIRA: https://issues.apache.org/jira/browse/HUDI-7947
In streaming processing, there are often scenarios where the table is widened. The current mainstream real-time wide table concatenation is completed through Flink‘s multi-layer join; Flink’s join will cache a large amount of data in the state backend. As the data set increases, the pressure on the Flink task state backend will gradually increase, and may even become unavailable. In multi-layer join scenarios, this problem is more obvious.
1.x also supports partial updates being encoded in logfiles. That should be able to handle this scenario. But even with partial-update, the column families will reduce write amplification on compaction.
So, main gains of clustering columns for wide tables are:
Write performance:
Read performance:
Since the data is already sorted when it is written, the SortMerge method can be used directly to merge the data; compared with the native bucket data reading performance is improved a lot, and the memory consumption is reduced significantly.
Compaction performance:
The logic of compaction and reading is the same. Compaction costs across column families is where there real savings are.
The log merge we can make it pluggable to decide between hash or sort merge - we need to introduce new log headers or standard mechanism for merging to determine if base file or log files are sorted.
Currently, Hudi organizes data according to fileGroup granularity. The fileGroup is further divided into column clusters to introduce the columnFamily concept.
The organizational form of Hudi files is divided according to the following rules:
The data in the partition is divided into buckets according to hash (each bucket maps to a file group); the files in each bucket are divided according to columnFamily; multiple colFamily files in the bucket form a completed fileGroup; when there is only one columnFamily, it degenerates into the native Hudi bucket table.
This feature should be implemented for both Spark and Flink. So, a table written by Flink this way, also can be read by Spark.
After the column family is introduced, the storage structure of the entire Hudi bucket table changes:
The bucket is divided into multiple columnFamilies by column cluster. When columnFamily is 1, it will automatically degenerate into the native bucket table.
After splitting the fileGroup by columnFamily, the naming rules for base files and log files change. We add the cfName suffix at the end of all file names to facilitate Hudi itself to distinguish column families. If it's not present, we assume default column family. So, new file name templates will be as follows:
Also, we should evolve the metadata table files schema to additionally track a column family name.
In the table creation statement, column family division is specified in the options/tblproperties attribute; Column family attributes are specified in key-value mode:
Constraints: The column family list must contain the primary key, and columns contained in different column families cannot overlap except for the primary key. The preCombine field does not need to be specified. If it is not specified, the primary key will be taken by default.
After the table is created, the column family attributes will be persisted to hoodie's metadata for subsequent use.
Use the SQL alter command to modify the column family attributes and persist it:
Specific steps are as follows:
The Hudi kernel divides the input data according to column families; the data belonging to a certain column family is sorted and directly written to the corresponding column family log file.
Specific steps:
After the table columns are clustered, the writing process includes the process of sorting and splitting the data compared to the original bucket bucketing. A new append interface needs to be introduced to support column families.
Introduce ColumnFamilyAppendHandle extend AppendHandle to implement column family writing.
Hudi internal row reader reading steps:
Since the data read by each familyReader is sorted by the primary key, the row reader merges the data read by each familyReader in the form of sortMergeJoin and returns the complete data.
The entire reading process involves a large amount of data merging, but because the data itself is sorted, the memory consumption of the entire merging process is very low and the merging is fast. Compared with Hudi's native merging method, the memory pressure and the merging time are significantly reduced.
Extend Hudi‘s compaction schedule module to merge each column family’s own base file and log file:
Extend Hudi's compaction schedule module to merge and update all column families in the entire table.
After merging at the column family level, multiple column families are finally merged into a complete row and saved.
Full compaction will be optional, only column family level compaction is required.
Different users might need different columns, some might need columns come from multiple column families, some might need columns from only one column family. It's better to allow users to choose whether enable full compaction or not.
Besides, after full compaction, projection on the reader side is less efficient because the projection could only be based on the full parquet file with all complete fields instead of based on column family names.
ColumnFamily can be used not only in ML scenarios and AI feature table, but also can be used to simulate multi-stream join and concatenation of wide tables. 。 In simulation of multi-stream join scenario, Hudi should produce complete rows, so full compaction is needed in this case.
This feature itself is a brand-new feature. If you don’t actively turn it on, you will not be able to reach the logic of the column families.
Business behavior compatibility: No impact, this function will not be actively turned on, and column family logic will not be enabled.
Syntax compatibility: No impact, the column family attributes are in the table attributes and are executed through SQL standard syntax.
Compatibility of data type processing methods: No impact, this design will not modify the bottom-level data field format.
List to check that the implementation works as expected: