title: “Employing correct configurations for Hudi's cleaner table service” excerpt: “Ensuring isolation between Hudi writers and readers using HoodieCleaner.java
” author: pratyakshsharma category: blog image: /assets/images/blog/hoodie-cleaner/Initial_timeline.png tags:
Apache Hudi provides snapshot isolation between writers and readers. This is made possible by Hudi’s MVCC concurrency model. In this blog, we will explain how to employ the right configurations to manage multiple file versions. Furthermore, we will discuss mechanisms available to users on how to maintain just the required number of old file versions so that long running readers do not fail.
Hudi provides different table management services to be able to manage your tables on the data lake. One of these services is called the Cleaner. As you write more data to your table, for every batch of updates received, Hudi can either generate a new version of the data file with updates applied to records (COPY_ON_WRITE) or write these delta updates to a log file, avoiding rewriting newer version of an existing file (MERGE_ON_READ). In such situations, depending on the frequency of your updates, the number of file versions of log files can grow indefinitely. If your use-cases do not require keeping an infinite history of these versions, it is imperative to have a process that reclaims older versions of the data. This is Hudi’s cleaner service.
In a data lake architecture, it is a very common scenario to have readers and writers concurrently accessing the same table. As the Hudi cleaner service periodically reclaims older file versions, scenarios arise where a long running query might be accessing a file version that is deemed to be reclaimed by the cleaner. Here, we need to employ the correct configs to ensure readers (aka queries) don’t fail.
To deal with the mentioned scenario, lets understand the different cleaning policies that Hudi offers and the corresponding properties that need to be configured. Options are available to schedule cleaning asynchronously or synchronously. Before going into more details, we would like to explain a few underlying concepts:
<fileId>_<writeToken>_<instantTime>.parquet
. In subsequent writes of this file, file id remains the same and commit time gets updated to show the latest version. This also implies any particular version of a record, given its partition path, can be uniquely located using the file id and instant time.Hudi cleaner currently supports below cleaning policies:
Suppose a user is ingesting data into a hudi dataset of type COPY_ON_WRITE every 30 minutes as shown below:
Figure1: Incoming records getting ingested into a hudi dataset every 30 minutes
The figure shows a particular partition on DFS where commits and corresponding file versions are color coded. 4 different file groups are created in this partition as depicted by fileGroup1, fileGroup2, fileGroup3 and fileGroup4. File group corresponding to fileGroup2 has records ingested from all the 5 commits, while the group corresponding to fileGroup4 has records from the latest 2 commits only.
Suppose the user uses the below configs for cleaning:
hoodie.cleaner.policy=KEEP_LATEST_COMMITS hoodie.cleaner.commits.retained=2
Cleaner selects the versions of files to be cleaned by taking care of the following:
commit 10:30
and commit 10:00
correspond to the latest 2 commits in the timeline. One extra commit is included because the time window for retaining commits is essentially equal to the longest query run time. So if the longest query takes 1 hour to finish, and ingestion happens every 30 minutes, you need to retain last 2 commits since 2*30 = 60 (1 hour). At this point of time, the longest query can still be using files written in 3rd commit in reverse order. Essentially this means if a query started executing after commit 9:30
, it will still be running when clean action is triggered right after commit 10:30
as in Figure2.commit 9:30
in figure below) in reverse order.Figure2: Files corresponding to latest 3 commits are retained
Now, suppose the user uses the below configs for cleaning:
hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS hoodie.cleaner.fileversions.retained=1
Cleaner does the following:
commit 10:30
, the cleaner will simply leave the latest version in every file group and delete the rest.Figure3: Latest file version in every file group is retained
You can find the details about all the possible configurations along with the default values here.
Hudi's cleaner table service can be run as a separate process or along with your data ingestion. As mentioned earlier, it basically cleans up any stale/old files lying around. In case you want to run it along with ingesting data, configs are available which enable you to run it synchronously or asynchronously. You can use the below command for running the cleaner independently:
[hoodie]$ spark-submit --class org.apache.hudi.utilities.HoodieCleaner \ --props s3:///temp/hudi-ingestion-config/kafka-source.properties \ --target-base-path s3:///temp/hudi \ --spark-master yarn-cluster
In case you wish to run the cleaner service asynchronously with writing, please configure the below:
hoodie.clean.automatic=true hoodie.clean.async=true
Further you can use Hudi CLI for managing your Hudi dataset. CLI provides the below commands for cleaner service:
cleans show
clean showpartitions
cleans run
You can find more details and the relevant code for these commands in org.apache.hudi.cli.commands.CleansCommand
class.
Work is currently going on for introducing a new cleaning policy based on time elapsed. This will help in achieving a consistent retention throughout regardless of how frequently ingestion happens. You may track the progress here.
We hope this blog gives you an idea about how to configure the Hudi cleaner and the supported cleaning policies. Please visit the blog section for a deeper understanding of various Hudi concepts. Cheers!