JIRA: HUDI-2488
Metadata indexing (aka metadata bootstrapping) is the process of creation of one or more metadata-based indexes, e.g. data partitions to files index, that is stored in Hudi metadata table. Currently, the metadata table (referred as MDT hereafter) supports single partition which is created synchronously with the corresponding data table, i.e. commits are first applied to metadata table followed by data table. Our goal for MDT is to support multiple partitions to boost the performance of existing index and records lookup. However, the synchronous manner of metadata indexing is not very scalable as we add more partitions to the MDT because the regular writers (writing to the data table) have to wait until the MDT commit completes. In this RFC, we propose a design to support asynchronous metadata indexing.
We can read more about the MDT design in RFC-15 . Here is a quick summary of the current state (Hudi v0.10.1). MDT is an internal Merge-on-Read (MOR) table that has a single partition called files
which stores the data partitions to files index that is used in file listing. MDT is co-located with the data table (inside .hoodie/metadata
directory under the basepath). In order to handle multi-writer scenario, users configure lock provider and only one writer can access MDT in read-write mode. Hence, any write to MDT is guarded by the data table lock. This ensures only one write is committed to MDT at any point in time and thus guarantees serializability. However, locking overhead adversely affects the write throughput and will reach its scalability limits as we add more partitions to the MDT.
We introduce a new action index
which will denote the index building process, the mechanics of which is as follows:
From an external process, users can issue a CREATE INDEX or run a job to trigger indexing for an existing table.
<instant_time>.index.requested
to the timeline, which contains the indexing plan. Index scheduling will also initialize the filegroup for the partitions for which indexing is planned. The creation of filegroups will be done within a lock.t
, where t
is the latest completed instant time on the timeline without any “holes” i.e. no pending async operations prior to it.hoodie.properties
) will be updated to indicate that partition is available for reads or synchronous updates. Hudi table config will be the source of truth for the current state of metadata index.Any inflight writers (i.e. with instant time t'
> t
) will check for any new indexing request on the timeline prior to preparing to commit.
When the indexing process is about to complete (i.e. indexing upto instant t
is done but before completing indexing commit), it will check for all completed commit instants after t
to ensure each of them added entries per its indexing plan, otherwise simply abort after a configurable timeout. Let's call this the indexing catchup. So, the indexer will not only write base files but also ensure that log entries due to instants after t
are in the same filegroup i.e. no new filegroup is initialized by writers while indexing is in progress.
We can just introduce a lock for adding events to the timeline and these races would vanish completely, still providing great scalability and asynchrony for these processes. The indexer will error out if there is no lock provider configured.
Let us walkthrough a concrete mutli-writer scenario to understand the above indexing mechanism. In this scenario, let instant t0
be the last completed instant on the timeline. Suppose user triggered index building from an external process at t3
. This will create t3.index.requested
file with the indexing plan. The plan contains the metadata partitions that need to be created and the last completed instant, e.g.
[ {MetadataPartitionType.FILES.partitionPath(), t0}, {MetadataPartitionType.BLOOM_FILTER.partitionPath(), t0}, {MetadataPartitionType.COLUMN_STATS.partitionPath(), t0} ]
Further, suppose there were two inflight writers Writer1 and Writer2 (with inflight instants t1
and t2
respectively) while the indexing was requested or inflight. In this case, the writers will check for pending index action and find a pending instant t3
. Now, if the metadata index creation is pending, which means indexer has already intialized a filegroup, then each writer will create log files in the same filegroup for the metadata index update. This will happen within the existing data table lock.
The indexer runs in a loop until the metadata for data upto t0
plus the data written due to t1
and t2
has been indexed, or the indexing timed out. Whether indexing timed out or not, table config would be updated with any MDT partition(s) for which indexing was complete till t2
. In case of timeout indexer will abort. At this point, user can trigger the index process again, however, this time indexer will check for available partitions in table config and skip those partitions. This design ensures that the regular writers do not fail due to indexing.
The scheduling initializes the file groups for metadata partitions in a lock. It does not update any table config.
1 Run pre-scheduling validation (valid index requested, lock provider configured, idempotent checks) 2 Begin transaction 2.a Get the base instant 2.b Start initializing file groups for each partition 2.c Create index plan and save indexing.requested instant to the timeline 3 End transaction
If there is failure in any of the above steps, then we abort gracefully i.e. delete the metadata partition if it was initialized.
This is a separate executor, which reads the plan and builds the index.
1 Run pre-indexing checks (lock provider configured, indexing.requested exists, idempotent checks) 2 Read the indexing plan and if any of the requested partition is inflight or already completed then error out and return early 3 Transition indexing.requested to inflight 4 Build metadata partitions 4.a Build the base file in the metadata partition to index upto instant as per the plan 4.b Update inflight partitions config in hoodie.properties 5 Determine the catchup start instant based on write and non-write timeline 6 Start indexing catchup in a separate thread (that can be interrupted upon timeout) 6.a For each instant to catchup 6.a.i if instant is completed and has corresponding deltacommit in metadata timeline then continue 6.a.ii if instant is inflight, then reload active timeline periodically until completed or timed out 6.a.iii update metadata table, if needed, within a lock 7 Build indexing commit metadata with the partition info and caught upto instant 8 Begin transaction 8.a update completed metadata partitions in table config 8.b save indexing commit metadata to the timeline transition indexing.inflight to completed. 9 End transaction
If there is failure in any of the above steps, then we abort gracefully i.e. delete the metadata partition if it exists and revert the table config updates.
# enable metadata hoodie.metadata.enable=true # enable asynchronous metadata indexing hoodie.metadata.index.async=true # enable column stats index hoodie.metadata.index.column.stats.enable=true # set indexing catchup timeout hoodie.metadata.index.check.timeout.seconds=60 # set OCC concurrency mode hoodie.write.concurrency.mode=optimistic_concurrency_control # set lock provider hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
While upgrading from a previous version to the current version, if metadata is enabled and files
partition exists then completed partitions in hoodie.paroperties will be updated to files
partition. While downgrading to a previous version, if metadata table exists then it is deleted because metadata table in current version has a schema that is not forward compatible.
Case 1: Writer fails while indexer is inflight
This means index update due to writer did not complete. Indexer continues to build the index ignoring the failed instant due to writer. The next update by the writer will trigger a rollback of the failed instant, which will also rollback incomplete updates in metadata table.
Case 2: Indexer fails while writer is inflight
Writer will commit adding log entries to the metadata partition. However, table config will indicate that partition is not ready to use. When indexer is re-triggered, it will check the plan and table config to figure out which MDT partitions to index and start indexing for those partitions.
Case 3: Race conditions
a) Writer went inflight just after an indexing request was added but indexer has not yet started executing.
In this case, writer will continue to log updates in metadata partition. At the time of execution, indexer will see there are already some log files and ensure that the indexing catchup passes.
b) Inflight writer about to commit, but indexing completed just before that.
Ideally, the indexing catchup in the indexer should have failed. But this could happen in the following sequence of events:
In this case, the writer will continue to write log files under the latest base filegroup in the MDT partition. Even though the indexer missed the updates due to writer, there is no “index loss” as such i.e. metadata due to writer is still updated in the MDT partition. Async compaction on the MDT will eventually merge the updates into another base file.
Or, we can introduce a lock for adding events to the metadata timeline.
c) Inflight writer about to commit but index is still being scheduled
Consider the following scenario:
In this case, we ensure that scheduling for metadata index always happens within a lock. Since the initialization of filegroups happen at the time of scheduling, indexer will hold the lock until all the filegroups are created.
Case 4: Async table services
The metadata partition cannot be used if there is any pending index action against it. So, async compaction/cleaning/clustering will ignore the metadata partition for which indexing is inflight.
Case 5: Data timeline with holes
Let's say the data timeline when indexer is started looks like: C1, C2,.... C5 (inflight), C6, C7, C8
, where C1
is a commit at instant 1
. In this case the latest completed instant without any hole is C4
. So, indexer will continue to index upto C4
. Instants C5-C8
will go through the indexing catchup. If C5
does not complete before the timeout, then indexer will abort. The indexer will run through the same process again when re-triggered.
The above example contained only write commits however the indexer will consider non-write commits (such as clean/restore/rollback) as well. Let's take such an example:
DC | DC | DC | CLEAN | DC | DC | COMPACT | DC | INDEXING | DC |
---|---|---|---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 |
C | C | C | I | C | C | R | C | R | I |
Here, DC indicates a deltacommit, second row is the instant time, and the last row is whether the action is completed (C), inflight (I) or requested(R). In this case, the base instant upto which there are no holes in write timeline is DC6
. The indexer will also check the earliest pending instant in non-write timeline before this base instant, which is CLEAN4
. While the indexing is done upto base instant, the remaining instants (CLEAN4, COMPACT7, DC8) are checked during indexing catchup whether they logged updated to corresponding filegroup as per the index plan. Note that during catchup, indexer won't move beyond unless the instants to catch up actually get into completed state. For instance, if the CLEAN4 was inflight till the configured timeout, then indexer will abort.
There can be two kinds of existing users:
a) Enabling metadata for the first time: There should not be any impact on such users. When they enable metadata, they can trigger indexing process. b) Metadata already enabled: Such users already have metadata table with at least one partition. If they trigger indexing process, then the indexer should take into account the existing metadata and ignore instants upto which MDT is in sync with the data table.
The changes will be backward-compatible and if the async indexing is diabled then the existing behavior of MDT creation and updates will be used.
Not required.
Not required