Hoodie provides the following primitives over datasets on HDFS
In order to achieve this, Hoodie maintains a timeline
of all activity performed on the dataset, that helps provide instantaenous
views of the dataset, while also efficiently supporting retrieval of data in the order of arrival into the dataset. Such key activities include
COMMITS
- A single commit captures information about an atomic write of a batch of records into a dataset. Commits are identified by a monotonically increasing timestamp, denoting the start of the write operation.CLEANS
- Background activity that gets rid of older versions of files in the dataset, that are no longer needed.COMPACTIONS
- Background activity to reconcile differential data structures within Hoodie e.g: moving updates from row based log files to columnar formats.{% include image.html file=“hoodie_timeline.png” alt=“hoodie_timeline.png” %}
Example above shows upserts happenings between 10:00 and 10:20 on a Hoodie dataset, roughly every 5 mins, leaving commit metadata on the hoodie timeline, along with other background cleaning/compactions. One key observation to make is that the commit time indicates the arrival time
of the data (10:20AM), while the actual data organization reflects the actual time or event time
, the data was intended for (hourly buckets from 07:00). These are two key concepts when reasoning about tradeoffs between latency and completeness of data.
When there is late arriving data (data intended for 9:00 arriving >1 hr late at 10:20), we can see the upsert producing new data into even older time buckets/folders. With the help of the timeline, an incremental query attempting to get all new data that was committed successfully since 10:00 hours, is able to very efficiently consume only the changed files without say scanning all the time buckets > 07:00.
Hoodie storage types capture how data is indexed & laid out on the filesystem, and how the above primitives and timeline activities are implemented on top of such organization (i.e how data is written). This is not to be confused with the notion of Read Optimized & Near-Real time tables, which are merely how the underlying data is exposed to the queries (i.e how data is read).
Hoodie (will) supports the following storage types.
Storage Type | Supported Tables |
---|---|
Copy On Write | Read Optimized |
Merge On Read | Read Optimized + Near Real-time |
{% include callout.html content=“Hoodie is a young project. merge-on-read is currently underway. Get involved here” type=“info” %}
Regardless of the storage type, Hoodie organizes a datasets into a directory structure under a basepath
, very similar to Hive tables. Dataset is broken up into partitions, which are folders containing files for that partition. Each partition uniquely identified by its partitionpath
, which is relative to the basepath.
Within each partition, records are distributed into multiple files. Each file is identified by an unique file id
and the commit
that produced the file. Multiple files can share the same file id but written at different commits, in case of updates.
Each record is uniquely identified by a record key
and mapped to a file id forever. This mapping between record key and file id, never changes once the first version of a record has been written to a file. In short, the file id
identifies a group of files, that contain all versions of a group of records.
As mentioned above, each commit on Copy On Write storage, produces new versions of files. In other words, we implicitly compact every commit, such that only columnar data exists. As a result, the write amplification (number of bytes written for 1 byte of incoming data) is much higher, where read amplification is close to zero. This is a much desired property for a system like Hadoop, which is predominantly read-heavy.
Following illustrates how this works conceptually, when data written into copy-on-write storage and two queries running on top of it.
{% include image.html file=“hoodie_cow.png” alt=“hoodie_cow.png” %}
As data gets written, updates to existing file ids, produce a new version for that file id stamped with the commit and inserts allocate a new file id and write its first version for that file id. These file versions and their commits are color coded above. Normal SQL queries running against such dataset (eg: select count(*) counting the total records in that partition), first checks the timeline for latest commit and filters all but latest versions of each file id. As you can see, an old query does not see the current inflight commit's files colored in pink, but a new query starting after the commit picks up the new data. Thus queries are immune to any write failures/partial writes and only run on committed data.
The intention of copy on write storage, is to fundamentally improve how datasets are managed today on Hadoop through
Merge on read storage is a superset of copy on write, in the sense it still provides a read optimized view of the dataset via the Read Optmized table. But, additionally stores incoming upserts for each file id, onto a row based append log
, that enables providing near real-time data to the queries by applying the append log, onto the latest version of each file id on-the-fly during query time. Thus, this storage type attempts to balance read and write amplication intelligently, to provide near real-time queries. The most significant change here, would be to the compactor, which now carefully chooses which append logs need to be compacted onto their columnar base data, to keep the query performance in check (larger append logs would incur longer merge times with merge data on query side)
Following illustrates how the storage works, and shows queries on both near-real time table and read optimized table.
{% include image.html file=“hoodie_mor.png” alt=“hoodie_mor.png” max-width=“1000” %}
There are lot of interesting things happening in this example, which bring out the subleties in the approach.
{% include callout.html content=“Hoodie is a young project. merge-on-read is currently underway. Get involved here” type=“info” %}
The intention of merge on read storage, is to enable near real-time processing directly on top of Hadoop, as opposed to copying data out to specialized systems, which may not be able to handle the data volume.