JIRA: https://issues.apache.org/jira/browse/HUDI-3217
Please keep the status updated in
rfc/README.md
.
Avro historically has been a centerpiece of the Hudi architecture: it's a default representation that many components expect when dealing with records (during merge, column value extractions, writing into storage, etc).
While having a single format of the record representation is certainly making implementation of some components simpler, it bears unavoidable performance penalty of de-/serialization loop: every record handled by Hudi has to be converted from (low-level) engine-specific representation (InternalRow
for Spark, RowData
for Flink, ArrayWritable
for Hive) into intermediate one (Avro), with some operations (like clustering, compaction) potentially incurring this penalty multiple times (on read- and write-paths).
As such, goal of this effort is to remove the need of conversion from engine-specific internal representations to Avro while handling records.
Historically, Avro has settled in as de-facto intermediate representation of the record's payload since the early days of Hudi. As project matured and the scale of the installations grew, necessity to convert into an intermediate representation quickly become a noticeable bottleneck in terms of performance of critical Hudi flows.
At the center of it is the hierarchy of HoodieRecordPayload
s, which is used to hold individual record's payload providing an APIs like preCombine
, combineAndGetUpdateValue
to combine it with other record using some user-defined semantic.
To achieve stated goals of avoiding unnecessary conversions into intermediate representation (Avro), existing Hudi workflows operating on individual records will have to be refactored and laid out in a way that would be unassuming about internal representation of the record, ie code should be working w/ a record as an opaque object: exposing certain API to access crucial data (precombine, primary, partition keys, etc), but not providing the access to the raw payload.
Having existing workflows re-structured in such a way around a record being an opaque object, would allow us to encapsulate internal representation of the record w/in its class hierarchy, which in turn would allow for us to hold engine-specific (Spark, Flink, etc) representations of the records w/o exposing purely engine-agnostic components to it.
Following (high-level) steps are proposed:
HoodieRecord
to become a standardized API of interacting with a single record, that will beHoodieRecordPayload
getPartitionKey
, getRecordKey
, etc)HoodieRecordPayload
into a standalone, stateless component. Such component will beHoodieRecordPayload
abstractionPhasing out usage of HoodieRecordPayload
will also bring the benefit of avoiding to use Java reflection in the hot-path, which is known to have poor performance (compared to non-reflection based instantiation).
CombineAndGetUpdateValue and Precombine will converge to one API. Stateless component interface providing for API Combining Records will look like following:
interface HoodieRecordMerger { /** * The kind of merging strategy this recordMerger belongs to. A UUID represents merging strategy. */ String getMergingStrategyId(); // This method converges combineAndGetUpdateValue and precombine from HoodiePayload. // It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C of the single record, both orders of operations applications have to yield the same result) Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException; // The record type handled by the current merger // SPARK, AVRO, FLINK HoodieRecordType getRecordType(); } /** * Spark-specific implementation */ class HoodieSparkRecordMerger implements HoodieRecordMerger { @Override public String getMergingStrategyId() { return LATEST_RECORD_MERGING_STRATEGY; } @Override Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { // Implements particular merging semantic natively for Spark row representation encapsulated wrapped around in HoodieSparkRecord. } @Override HoodieRecordType getRecordType() { return HoodieRecordType.SPARK; } } /** * Flink-specific implementation */ class HoodieFlinkRecordMerger implements HoodieRecordMerger { @Override public String getMergingStrategyId() { return LATEST_RECORD_MERGING_STRATEGY; } @Override Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { // Implements particular merging semantic natively for Flink row representation encapsulated wrapped around in HoodieFlinkRecord. } @Override HoodieRecordType getRecordType() { return HoodieRecordType.FLINK; } }
Where user can provide their own subclass implementing such interface for the engines of interest.
The RecordMerger is engine-aware. We provide a config called HoodieWriteConfig.RECORD_MERGER_IMPLS. You can set a list of RecordMerger class name to it. And you can set HoodieWriteConfig.RECORD_MERGER_STRATEGY which is UUID of RecordMerger. Hudi will pick RecordMergers in MERGER_IMPLS which has the same MERGER_STRATEGY according to the engine type at runtime.
HoodieRecordPayload
to HoodieRecordMerger
To warrant backward-compatibility (BWC) on the code-level with already created subclasses of HoodieRecordPayload
currently already used in production by Hudi users, we will provide a BWC-bridge in the form of instance of HoodieRecordMerger
called HoodieAvroRecordMerger
, that will be using user-defined subclass of HoodieRecordPayload
to combine the records.
Leveraging such bridge will provide for seamless BWC migration to the 0.11 release, however will be removing the performance benefit of this refactoring, since it would unavoidably have to perform conversion to intermediate representation (Avro). To realize full-suite of benefits of this refactoring, users will have to migrate their merging logic out of HoodieRecordPayload
subclass and into new HoodieRecordMerger
implementation.
Previously, we used to have separate methods for merging:
preCombine
was used to either deduplicate records in a batch or merge ones coming from delta-logs, whilecombineAndGetUpdateValue
was used to combine incoming record w/ the one persisted in storageNow both of these methods semantics are unified in a single merge
API w/in the RecordMerger
, which is required to be associative operation to be able to take on semantics of both preCombine
and combineAndGetUpdateValue
. HoodieAvroRecordMerger
's API will look like following:
/** * Backward compatibility HoodieRecordPayload implementation */ class HoodieAvroRecordMerger implements HoodieRecordMerger { @Override public String getMergingStrategyId() { return LATEST_RECORD_MERGING_STRATEGY; } @Override Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { // HoodieAvroRecordMerger precombine and combineAndGetUpdateValue. It'd be associative operation. } @Override HoodieRecordType getRecordType() { return HoodieRecordType.AVRO; } }
As was called out prior to achieve the goal of being able to sustain engine-internal representations being held by HoodieRecord
class w/o compromising major components' neutrality (ie being engine-agnostic), such components directly interacting w/ records' payloads today will have to be refactored to instead interact w/ standardized HoodieRecord
s API.
Following major components will be refactored:
HoodieWriteHandle
s will beHoodieRecord
instead of raw Avro payload (avoiding Avro conversion)HoodieRecord
as is to FileWriter
HoodieFileWriter
s will beHoodieRecord
HoodieRealtimeRecordReader
sHoodieRecord
instead of raw Avro payloadBecause we implement different types of records, we need to implement functionality similar to AvroUtils in HoodieRecord for different engine-specific payload representations (GenericRecord, InternalRow, RowData). Its public API will look like following:
class HoodieRecord { /** * Get columns in record. */ Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled); Comparable<?> getOrderingValue(Schema recordSchema, Properties props); /** * Support bootstrap. */ HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException; /** * Rewrite record into new schema(add meta columns) */ HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException; /** * Support schema evolution. */ HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException; HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema) throws IOException; HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException; boolean isDelete(Schema recordSchema, Properties props) throws IOException; /** * Is EmptyRecord. Generated by ExpressionPayload. */ boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException; // Other functions with getter or setter ... }
HoodieRecordPayload
, and instead subclass newly created interface HoodieRecordMerger
to get full-suite of performance benefitsHoodieRecordPayload
for merging will be marked as deprecated in 0.11, and subsequently removed in 0.1xThis refactoring will not be modifying any existing Hudi semantics other than the aforementioned, and as such to guarantee preservation of the logical correctness of the many flows that will be affected by the refactoring we will rely on the existing set of test-suites.
Nevertheless, we will run corresponding set of benchmarks stressing the flows being affected by the refactoring to validate that there are considerable performance advantage of abandoning conversion into intermediate representation completely.