JIRA: https://issues.apache.org/jira/browse/HUDI-2703
Hudi maintains several indices to locate/map incoming records to file groups during writes. Most commonly used record index is the HoodieBloomIndex. Larger tables and global index has performance issues as the bloom filter from a large number of data files needed to be read and looked up. Reading from several files over the cloud object storage like S3 also faces request throttling issues. We are proposing to build a new Metadata index (metadata table based bloom index) to boost the performance of existing bloom index.
HoodieBloomIndex is used to find the location of incoming records during every write. Bloom index assists Hudi in deterministically routing records to a given file group and to distinguish inserts vs updates. This aggregate bloom index is built from several bloom filters stored in the base file footers. Prior to bloom filter lookup, the file pruning for the incoming records is also done based on the record key min/max stats stored in the base file footers. In this RFC, we plan to build a new index for the bloom filters under the metadata table which to assist in bloom index based record location tagging. This overlaps with RFC-27 Data skipping index in the read path for improving the query performance.
HoodieBloomIndex involves the following steps to find the right location of incoming records
As we could see from step 1 and 2, we are in need of min and max values for “_hoodie_record_key” and bloom filters from all interested data files to perform the location tagging. In this design, we will add these key stats and bloom filter to the metadata table and thereby able to quickly load the interested details and do faster lookups.
Metadata table already has one partition files
to help in partition file listing. For the metadata table based indices, we are proposing to add following two new partitions:
bloom_filter
- for the file level bloom filtercolumn_stats
- for the key range statsWhy metadata table: Metadata table uses HBase HFile - the tree map file format to store and retrieve data. HFile is an indexed file format and supports map like faster lookups by keys. Since, we will be storing stats/bloom for every file and the index will do lookups based on files, we should be able to benefit from the faster lookups in HFile.
Following sections will talk about different partitions, key formats and then dive into the data and control flows.
A new partition bloom_filter
will be added under the metadata table. Bloom filters from all the base files in the data table will be added here. Metadata table is already in the HFile format. The existing metadata payload schema will be extended and shared for this partition also. The type field will be used to detect the bloom filter payload record. Here is the schema for the bloom filter payload record.
{ "doc": "Metadata Index of bloom filters for all data files in the user table", "name": "BloomFilterMetadata", "type": [ "null", { "doc": "Data file bloom filter details", "name": "HoodieMetadataBloomFilter", "type": "record", "fields": [ { "doc": "Bloom filter type code", "name": "type", "type": "string" }, { "doc": "Instant timestamp when this metadata was created/updated", "name": "timestamp", "type": "string" }, { "doc": "Bloom filter binary byte array", "name": "bloomFilter", "type": "bytes" }, { "doc": "Bloom filter entry valid/deleted flag", "name": "isDeleted", "type": "boolean" } ] } ] }
The key for the bloom filter record would be an encoded string representing the partition and base file combo. The partition and the file names are converted to deterministic hash based IDs, and then they are base64 encoded. Hash based IDs are easy to generate for the incoming new inserts records and for the lookup for the updated records. It doesn't need any dictionary to be added for the reverse lookups. Hash bits are chosen based on the cardinality and the collision probability desired for the support max scale deployment. Base64 encoding the hash IDs further reduces the on-disk storage space for these keys.
key = base64_encode(concat(hash64(partition name), hash128(file name)))
Another new partition column_stats
will also be added under the metadata table to make the record key lookup code path much more performant. This metadata index also helps in the data skipping (please look at RFC-27 for more details). In the context of faster record key lookups for the update use cases, proposing column_stats
index to be used for file pruning when generating the file to candidate keys mapping for the update records.The existing metadata payload schema will be extended and shared for this partition also. The type field will be used to detect the column stats payload record. Here is the schema for the column stats payload record.
{ "doc": "Metadata Index of column statistics for all data files in the user table", "name": "ColumnStatsMetadata", "type": [ "null", { "doc": "Data file column statistics", "name": "HoodieColumnStats", "type": "record", "fields": [ { "doc": "File name for which this column statistics applies", "name": "fileName", "type": [ "null", "string" ] }, { "doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type", "name": "minValue", "type": [ "null", "string" ] }, { "doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type", "name": "maxValue", "type": [ "null", "string" ] }, { "doc": "Total count of values", "name": "valueCount", "type": [ "null", "long" ] }, { "doc": "Total count of null values", "name": "nullCount", "type": [ "null", "long" ] }, { "doc": "Total storage size on disk", "name": "totalSize", "type": [ "null", "long" ] }, { "doc": "Total uncompressed storage size on disk", "name": "totalUncompressedSize", "type": [ "null", "long" ] }, { "doc": "Column range entry valid/deleted flag", "name": "isDeleted", "type": "boolean" } ] } ] }
Column stats records hold key ranges (min and max) for the file. The key for the column stat record would be an encoded string representing the tuple set of column name, partition name and the base file. The string names of these fields are converted to deterministic hash based IDs, and then they are base64 encoded, just like the bloom filter key.
key = base64_encode(concat(hash64(column name), hash64(partition name), hash128(file name)))
While Hash based IDs have quite a few desirable properties in the context of Hudi index lookups, there is an impact on the column level schema changes though. Refer to Schema Evolution section for more details.
Below picture gives a pictorial representation of Column stats partition in metadata table.
For the incoming upsert records, given their keys, tag their current location. The new algorithm for the index lookup would be
column_stats
index:bloom_filter
index:HashID based key are deterministically generated from the tuple input. That is, for the tuple consisting of column name, partition name and file name, the key generated would always be the same. So, a table where the schema gets changed over time would have an impact on the keys already generated. The most common schema evolution use cases like change of column type, adding a new column are not affected though. Other relatively uncommon use cases like column name rename, dropping a column and adding a column with dropped name would have indices referring them more than needed. This would lead to the index lookup matching stale/new records across evolved schemas.
To avoid looking up stale/new index records, here are the design options we have:
/** * Looks up the index and tags each incoming record with a location of a file that contains * the row (if it is actually present). */ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public abstract HoodieData<HoodieRecord<T>> tagLocation( HoodieData<HoodieRecord<T>> records, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException;
hoodie.metadata.file_pruning.enable
and if enabled, metadata table based column stat will be used for file pruning based on key ranges.Let's walk through the writer flow to update these partitions.
Whenever a new commit is getting applied to metadata table, we do the following.
We need to ensure we have all sufficient info in WriteStatus get sent to metadata writer for every commit.
When a new batch of write is ingested into Hudi, we need to tag the records with their original file group location. And this index will leverage both the partitions to deduce the record key => file name mappings. Refer to Metadata Index lookup section for more details.
Metadata new indexing for existing tables
, RFC proposal and doc pending) that are needed to have this feature in the later minor release need to be rolled out as part of 0.10.0