Apache DataFusion is a fast, extensible query engine for building data-centric systems in Rust. The paimon-datafusion crate provides a read-only integration that lets you query Paimon tables using SQL.
[dependencies] paimon = "0.0.0" paimon-datafusion = "0.0.0" datafusion = "52" tokio = { version = "1", features = ["full"] }
Register an entire Paimon catalog so all databases and tables are accessible via catalog.database.table syntax:
use std::sync::Arc; use datafusion::prelude::SessionContext; use paimon_datafusion::PaimonCatalogProvider; let ctx = SessionContext::new(); ctx.register_catalog("paimon", Arc::new(PaimonCatalogProvider::new(Arc::new(catalog)))); let df = ctx.sql("SELECT * FROM paimon.default.my_table").await?; df.show().await?;
Paimon supports time travel queries to read historical data. In DataFusion, this is done via the FOR SYSTEM_TIME AS OF clause.
Read data from a specific snapshot by passing an integer literal:
SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 1
This sets the scan.snapshot-id option and reads exactly that snapshot.
Read data as of a specific point in time by passing a timestamp string in YYYY-MM-DD HH:MM:SS format:
SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF '2024-01-01 00:00:00'
This finds the latest snapshot whose commit time is less than or equal to the given timestamp. The timestamp is interpreted in the local timezone.
Read data from a named tag by passing a string that is not a timestamp:
SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 'my_tag'
Tags are named snapshots created via Paimon's tag management (e.g., CALL sys.create_tag(...) in Spark). This is useful for pinning a stable version of the data for reproducible queries.
DataFusion requires the BigQuery SQL dialect to parse FOR SYSTEM_TIME AS OF. You also need to register the PaimonRelationPlanner:
use std::sync::Arc; use datafusion::prelude::{SessionConfig, SessionContext}; use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner}; let config = SessionConfig::new() .set_str("datafusion.sql_parser.dialect", "BigQuery"); let ctx = SessionContext::new_with_config(config); ctx.register_catalog("paimon", Arc::new(PaimonCatalogProvider::new(Arc::new(catalog)))); ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))?; // Now time travel queries work let df = ctx.sql("SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 1").await?;