The native Rust implementation for Apache Hudi, with C++ & Python API bindings.

Clone this repo:
  1. 997e42e chore(ci): replace archived actions-rs actions (#351) by Jacob Wujciak-Jens · 14 hours ago main
  2. 7243930 chore: update `asf.yaml` for more automation (#349) by Shiyan Xu · 7 days ago
  3. d2c5c79 docs: update readme docs and the project description (#348) by Shiyan Xu · 9 days ago
  4. aa5bd8e fix: resolve env vars for creating standalone file group reader (#345) by Shiyan Xu · 9 days ago
  5. 4bf6fd7 refactor: use `OptionResolver` to handle options (#341) by Shiyan Xu · 9 days ago

The Hudi-rs project aims to standardize the core Apache Hudi APIs, and broaden the Hudi integration in the data ecosystems for a diverse range of users and projects.

SourceDownloadsInstallation Command
PyPi.orgpip install hudi
Crates.iocargo add hudi

Usage Examples

[!NOTE] These examples expect a Hudi table exists at /tmp/trips_table, created using the quick start guide.

Snapshot Query

Snapshot query reads the latest version of the data from the table. The table API also accepts partition filters.

Python

from hudi import HudiTableBuilder
import pyarrow as pa

hudi_table = HudiTableBuilder.from_base_uri("/tmp/trips_table").build()
batches = hudi_table.read_snapshot(filters=[("city", "=", "san_francisco")])

# convert to PyArrow table
arrow_table = pa.Table.from_batches(batches)
result = arrow_table.select(["rider", "city", "ts", "fare"])
print(result)

Rust

use hudi::error::Result;
use hudi::table::builder::TableBuilder as HudiTableBuilder;
use arrow::compute::concat_batches;

#[tokio::main]
async fn main() -> Result<()> {
    let hudi_table = HudiTableBuilder::from_base_uri("/tmp/trips_table").build().await?;
    let batches = hudi_table.read_snapshot(&[("city", "=", "san_francisco")]).await?;
    let batch = concat_batches(&batches[0].schema(), &batches)?;
    let columns = vec!["rider", "city", "ts", "fare"];
    for col_name in columns {
        let idx = batch.schema().index_of(col_name).unwrap();
        println!("{}: {}", col_name, batch.column(idx));
    }
    Ok(())
}

To run read-optimized (RO) query on Merge-on-Read (MOR) tables, set hoodie.read.use.read_optimized.mode when creating the table.

Python

hudi_table = (
    HudiTableBuilder
    .from_base_uri("/tmp/trips_table")
    .with_option("hoodie.read.use.read_optimized.mode", "true")
    .build()
)

Rust

let hudi_table = 
    HudiTableBuilder::from_base_uri("/tmp/trips_table")
    .with_option("hoodie.read.use.read_optimized.mode", "true")
    .build().await?;

Time-Travel Query

Time-travel query reads the data at a specific timestamp from the table. The table API also accepts partition filters.

Python

batches = (
    hudi_table
    .read_snapshot_as_of("20241231123456789", filters=[("city", "=", "san_francisco")])
)

Rust

let batches = 
    hudi_table
    .read_snapshot_as_of("20241231123456789", &[("city", "=", "san_francisco")]).await?;

The supported formats for the timestamp argument are:

  • Hudi Timeline format (highest matching precedence): yyyyMMddHHmmssSSS or yyyyMMddHHmmss.
  • Unix epoch time in seconds, milliseconds, microseconds, or nanoseconds.
  • ISO 8601 format including but not limited to:
    • yyyy-MM-dd'T'HH:mm:ss.SSS+00:00
    • yyyy-MM-dd'T'HH:mm:ss.SSSZ
    • yyyy-MM-dd'T'HH:mm:ss.SSS
    • yyyy-MM-dd'T'HH:mm:ss+00:00
    • yyyy-MM-dd'T'HH:mm:ssZ
    • yyyy-MM-dd'T'HH:mm:ss
    • yyyy-MM-dd

Incremental Query

Incremental query reads the changed data from the table for a given time range.

Python

# read the records between t1 (exclusive) and t2 (inclusive)
batches = hudi_table.read_incremental_records(t1, t2)

# read the records after t1
batches = hudi_table.read_incremental_records(t1)

Rust

// read the records between t1 (exclusive) and t2 (inclusive)
let batches = hudi_table.read_incremental_records(t1, Some(t2)).await?;

// read the records after t1
let batches = hudi_table.read_incremental_records(t1, None).await?;

Incremental queries support the same timestamp formats as time-travel queries.

File Group Reading (Experimental)

File group reading allows you to read data from a specific file slice. This is useful when integrating with query engines, where the plan provides file paths.

Python

from hudi import HudiFileGroupReader

reader = HudiFileGroupReader(
    "/table/base/path", {"hoodie.read.file_group.start_timestamp": "0"})

# Returns a PyArrow RecordBatch
record_batch = reader.read_file_slice_by_base_file_path("relative/path.parquet")

Rust

use hudi::file_group::reader::FileGroupReader;

let reader = FileGroupReader::new_with_options(
    "/table/base/path", [("hoodie.read.file_group.start_timestamp", "0")])?;

// Returns an Arrow RecordBatch
let record_batch = reader.read_file_slice_by_base_file_path("relative/path.parquet").await?;

C++

#include "cxx.h"
#include "src/lib.rs.h"
#include "arrow/c/abi.h"

auto reader = new_file_group_reader_with_options(
    "/table/base/path", {"hoodie.read.file_group.start_timestamp=0"});

// Returns an ArrowArrayStream pointer
ArrowArrayStream* stream_ptr = reader->read_file_slice_by_base_file_path("relative/path.parquet");

Query Engine Integration

Hudi-rs provides APIs to support integration with query engines. The sections below highlight some commonly used APIs.

Table API

Create a Hudi table instance using its constructor or the TableBuilder API.

StageAPIDescription
Query planningget_file_slices()For snapshot query, get a list of file slices.
get_file_slices_splits()For snapshot query, get a list of file slices in splits.
get_file_slices_as_of()For time-travel query, get a list of file slices at a given time.
get_file_slices_splits_as_of()For time-travel query, get a list of file slices in splits at a given time.
get_file_slices_between()For incremental query, get a list of changed file slices between a time range.
Query executioncreate_file_group_reader_with_options()Create a file group reader instance with the table instance's configs.

File Group API

Create a Hudi file group reader instance using its constructor or the Hudi table API create_file_group_reader_with_options().

StageAPIDescription
Query executionread_file_slice()Read records from a given file slice; based on the configs, read records from only base file, or from base file and log files, and merge records based on the configured strategy.
read_file_slice_by_base_file_path()Read records from a given base file path; log files will be ignored

Apache DataFusion

Enabling the hudi crate with datafusion feature will provide a DataFusion extension to query Hudi tables.

cargo new my_project --bin && cd my_project
cargo add tokio@1 datafusion@45
cargo add hudi --features datafusion

Update src/main.rs with the code snippet below then cargo run.

use std::sync::Arc;

use datafusion::error::Result;
use datafusion::prelude::{DataFrame, SessionContext};
use hudi::HudiDataSource;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();
    let hudi = HudiDataSource::new_with_options(
        "/tmp/trips_table",
        [("hoodie.read.input.partitions", "5")]).await?;
    ctx.register_table("trips_table", Arc::new(hudi))?;
    let df: DataFrame = ctx.sql("SELECT * from trips_table where city = 'san_francisco'").await?;
    df.show().await?;
    Ok(())
}

Other Integrations

Hudi is also integrated with

Work with cloud storage

Ensure cloud storage credentials are set properly as environment variables, e.g., AWS_*, AZURE_*, or GOOGLE_*. Relevant storage environment variables will then be picked up. The target table's base uri with schemes such as s3://, az://, or gs:// will be processed accordingly.

Alternatively, you can pass the storage configuration as options via Table APIs.

Python

from hudi import HudiTableBuilder

hudi_table = (
    HudiTableBuilder
    .from_base_uri("s3://bucket/trips_table")
    .with_option("aws_region", "us-west-2")
    .build()
)

Rust

use hudi::table::builder::TableBuilder as HudiTableBuilder;

async fn main() -> Result<()> {
    let hudi_table = 
        HudiTableBuilder::from_base_uri("s3://bucket/trips_table")
        .with_option("aws_region", "us-west-2")
        .build().await?;
}

Contributing

Check out the contributing guide for all the details about making contributions to the project.