JIRA: https://issues.apache.org/jira/browse/HUDI-9798
Please keep the status updated in
rfc/README.md.
The HoodieRecordMerger interface allows users to implement custom merging logic for records. The interface does not provide clear instructions on how to handle deletions and can currently lead to differences in behavior based on execution engine. This RFC proposes updates to the interface to clarify the expectations so that out of order updates and deletes are handled properly across implementations.
Currently, the HoodieRecordMerger interface implementations within the Hudi codebase are not consistent when it comes to how deletions are handled and can lead to differences in the results when merging records as part of the writer or reader path. Also, it is unclear from the current documentation of the interface for how the implementation should handle deleted records.
As part of the migration to the new HoodieFileGroupReader, the merging logic has been refactored for the common use cases by creating a BufferedRecord, which wraps the data and key information about it such as the ordering values and record key, and RecordContext which provides a common interface for inspecting the underlying data. The new APIs can be used to simplify the implementation of the HoodieRecordMerger and also help users implement custom mergers that are engine agnostic.
The HoodieRecordMerger interface will be updated to no longer rely on the HoodieRecord class, and instead use the BufferedRecord and RecordContext classes. The BufferedRecord class contains the data as well as the record key, ordering value, schema identifier, and the HoodieOperation.
The existing interface methods are listed here for reference:
import java.io.IOException; Option<Pair<HoodieRecord, Schema>> partialMerge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema, TypedProperties props) throws IOException; Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException; ist<Pair<HoodieRecord, Schema>> fullOuterMerge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException; boolean shouldFlush(HoodieRecord record, Schema schema, TypedProperties props) throws IOException;
The methods in the interface will be updated as follows:
<T> BufferedRecord<T> partialMerge(BufferedRecord<T> older, BufferedRecord<T> newer, Schema readerSchema, RecordContext<T> recordContext, MergerConfig config) throws IOException; <T> BufferedRecord merge(BufferedRecord<T> older, BufferedRecord<T> newer, RecordContext<T> recordContext, MergerConfig config) throws IOException; --- /** * MergerConfig is a configuration class used to pass all the required properties for merging records. * The record key, partition, and ordering field configurations along with options required by the * DeleteContext will be present in this object. Any custom options provided to the reader or writer * will also be passed to this object to ensure custom mergers and payloads have access to the properties * set by the user. */ public class MergerConfig extends HoodieConfig { ... }
For both the merge and partialMerge methods, the implementation will mark the resulting BufferedRecord as a delete operation if the resulting record should result in a deletion of the row matching that record key. The implementation should also return the data in the BufferedRecord as non-null whenever possible to allow for the logic in future merge operations to reference the previous value of the data, even if that value results in a deletion. The ordering values must always be set in the result if there are any ordering fields for the table to ensure that future merges can reference these values.
Using the new BufferedRecord class will allow for easier integration with the BufferedRecordMerger interface used by the HoodieFileGroupReader and avoid converting to and from the HoodieRecord class while merging.
In addition to the updates to the existing methods, two methods will be removed. The shouldFlush method will be removed from the interface as part of simplifying the interface. This method served as another way a user could decide to not write the output to a file, but now any record that should not be written to the file should simply be marked as a delete operation. The fullOuterMerge method will also be removed since it is not used in the current codebase. It can be re-added if there is a use case for it.
shouldFlush API will be removed from the codebase and the updates to the existing merger implementations will be handled before the Apache Hudi 1.1.0 release.