blob: 3cbe189147752d808f2a875b5fa4a2c410463ecb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Embedding and using a custom index in Parquet files
//!
//! # Background
//!
//! This example shows how to add an application‑specific index to an Apache
//! Parquet file without modifying the Parquet format itself. The resulting
//! files can be read by any standard Parquet reader, which will simply
//! ignore the extra index data.
//!
//! A “distinct value” index, similar to a ["set" Skip Index in ClickHouse],
//! is stored in a custom binary format within the parquet file. Only the
//! location of index is stored in Parquet footer key/value metadata.
//! This approach is more efficient than storing the index itself in the footer
//! metadata because the footer must be read and parsed by all readers,
//! even those that do not use the index.
//!
//! This example uses a file level index for skipping entire files, but any
//! index can be stored using the same techniques and used skip row groups,
//! data pages, or rows using the APIs on [`TableProvider`] and [`ParquetSource`].
//!
//! The resulting Parquet file layout is as follows:
//!
//! ```text
//! ┌──────────────────────┐
//! │┌───────────────────┐ │
//! ││ DataPage │ │
//! │└───────────────────┘ │
//! Standard Parquet │┌───────────────────┐ │
//! Data Pages ││ DataPage │ │
//! │└───────────────────┘ │
//! │ ... │
//! │┌───────────────────┐ │
//! ││ DataPage │ │
//! │└───────────────────┘ │
//! │┏━━━━━━━━━━━━━━━━━━━┓ │
//! Non standard │┃ ┃ │
//! index (ignored by │┃Custom Binary Index┃ │
//! other Parquet │┃ (Distinct Values) ┃◀│─ ─ ─
//! readers) │┃ ┃ │ │
//! │┗━━━━━━━━━━━━━━━━━━━┛ │
//! Standard Parquet │┏━━━━━━━━━━━━━━━━━━━┓ │ │ key/value metadata
//! Page Index │┃ Page Index ┃ │ contains location
//! │┗━━━━━━━━━━━━━━━━━━━┛ │ │ of special index
//! │╔═══════════════════╗ │
//! │║ Parquet Footer w/ ║ │ │
//! │║ Metadata ║ ┼ ─ ─
//! │║ (Thrift Encoded) ║ │
//! │╚═══════════════════╝ │
//! └──────────────────────┘
//!
//! Parquet File
//!
//! # High Level Flow
//!
//! To create a custom Parquet index:
//!
//! 1. Compute the index and serialize it to a binary format.
//!
//! 2. Write the Parquet file with:
//! - regular data pages
//! - the serialized index inline
//! - footer key/value metadata entry to locate the index
//!
//! To read and use the index are:
//!
//! 1. Read and deserialize the file’s footer to locate the index.
//!
//! 2. Read and deserialize the index.
//!
//! 3. Create a `TableProvider` that knows how to use the index to quickly find
//! the relevant files, row groups, data pages or rows based on on pushed down
//! filters.
//!
//! # FAQ: Why do other Parquet readers skip over the custom index?
//!
//! The flow for reading a parquet file is:
//!
//! 1. Seek to the end of the file and read the last 8 bytes (a 4‑byte
//! little‑endian footer length followed by the `PAR1` magic bytes).
//!
//! 2. Seek backwards by that length to parse the Thrift‑encoded footer
//! metadata (including key/value pairs).
//!
//! 3. Read data required for decoding such as data pages based on the offsets
//! encoded in the metadata.
//!
//! Since parquet readers do not scan from the start of the file they will read
//! data in the file unless it is explicitly referenced in the footer metadata.
//!
//! Thus other readers will encounter and ignore an unknown key
//! (`distinct_index_offset`) in the footer key/value metadata. Unless they
//! know how to use that information, they will not attempt to read or
//! the bytes that make up the index.
//!
//! ["set" Skip Index in ClickHouse]: https://clickhouse.com/docs/optimize/skipping-indexes#set
use arrow::array::{ArrayRef, StringArray};
use arrow::record_batch::RecordBatch;
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::{exec_err, HashMap, HashSet, Result};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::TableType;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::{Operator, TableProviderFilterPushDown};
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::errors::ParquetError;
use datafusion::parquet::file::metadata::{FileMetaData, KeyValue};
use datafusion::parquet::file::reader::{FileReader, SerializedFileReader};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use std::fs::{read_dir, File};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::TempDir;
/// An index of distinct values for a single column
///
/// In this example the index is a simple set of strings, but in a real
/// application it could be any arbitrary data structure.
///
/// Also, this example indexes the distinct values for an entire file
/// but a real application could create multiple indexes for multiple
/// row groups and/or columns, depending on the use case.
#[derive(Debug, Clone)]
struct DistinctIndex {
inner: HashSet<String>,
}
impl DistinctIndex {
/// Create a DistinctIndex from an iterator of strings
pub fn new<I: IntoIterator<Item = String>>(iter: I) -> Self {
Self {
inner: iter.into_iter().collect(),
}
}
/// Returns true if the index contains the given value
pub fn contains(&self, value: &str) -> bool {
self.inner.contains(value)
}
/// Serialize the distinct index to a writer as bytes
///
/// In this example, we use a simple newline-separated format,
/// but a real application can use any arbitrary binary format.
///
/// Note that we must use the ArrowWriter to write the index so that its
/// internal accounting of offsets can correctly track the actual size of
/// the file. If we wrote directly to the underlying writer, the PageIndex
/// written right before the would be incorrect as they would not account
/// for the extra bytes written.
fn serialize<W: Write + Send>(
&self,
arrow_writer: &mut ArrowWriter<W>,
) -> Result<()> {
let serialized = self
.inner
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
.join("\n");
let index_bytes = serialized.into_bytes();
// Set the offset for the index
let offset = arrow_writer.bytes_written();
let index_len = index_bytes.len() as u64;
println!("Writing custom index at offset: {offset}, length: {index_len}");
// Write the index magic and length to the file
arrow_writer.write_all(INDEX_MAGIC)?;
arrow_writer.write_all(&index_len.to_le_bytes())?;
// Write the index bytes
arrow_writer.write_all(&index_bytes)?;
// Append metadata about the index to the Parquet file footer
arrow_writer.append_key_value_metadata(KeyValue::new(
"distinct_index_offset".to_string(),
offset.to_string(),
));
Ok(())
}
/// Read the distinct values index from a reader at the given offset and length
pub fn new_from_reader<R: Read + Seek>(mut reader: R, offset: u64) -> Result<Self> {
reader.seek(SeekFrom::Start(offset))?;
let mut magic_buf = [0u8; 4];
reader.read_exact(&mut magic_buf)?;
if magic_buf != INDEX_MAGIC {
return exec_err!("Invalid index magic number at offset {offset}");
}
let mut len_buf = [0u8; 8];
reader.read_exact(&mut len_buf)?;
let stored_len = u64::from_le_bytes(len_buf) as usize;
let mut index_buf = vec![0u8; stored_len];
reader.read_exact(&mut index_buf)?;
let Ok(s) = String::from_utf8(index_buf) else {
return exec_err!("Invalid UTF-8 in index data");
};
Ok(Self {
inner: s.lines().map(|s| s.to_string()).collect(),
})
}
}
/// DataFusion [`TableProvider]` that reads Parquet files and uses a
/// `DistinctIndex` to prune files based on pushed down filters.
#[derive(Debug)]
struct DistinctIndexTable {
/// The schema of the table
schema: SchemaRef,
/// Key is file name, value is DistinctIndex for that file
files_and_index: HashMap<String, DistinctIndex>,
/// Directory containing the Parquet files
dir: PathBuf,
}
impl DistinctIndexTable {
/// Create a new DistinctIndexTable for files in the given directory
///
/// Scans the directory, reading the `DistinctIndex` from each file
fn try_new(dir: impl Into<PathBuf>, schema: SchemaRef) -> Result<Self> {
let dir = dir.into();
let mut index = HashMap::new();
for entry in read_dir(&dir)? {
let path = entry?.path();
if path.extension().and_then(|s| s.to_str()) != Some("parquet") {
continue;
}
let file_name = path.file_name().unwrap().to_string_lossy().to_string();
let distinct_set = read_distinct_index(&path)?;
println!("Read distinct index for {file_name}: {file_name:?}");
index.insert(file_name, distinct_set);
}
Ok(Self {
schema,
files_and_index: index,
dir,
})
}
}
/// Wrapper around ArrowWriter to write Parquet files with an embedded index
struct IndexedParquetWriter<W: Write + Seek> {
writer: ArrowWriter<W>,
}
/// Magic bytes to identify our custom index format
const INDEX_MAGIC: &[u8] = b"IDX1";
impl<W: Write + Seek + Send> IndexedParquetWriter<W> {
pub fn try_new(sink: W, schema: Arc<Schema>) -> Result<Self> {
let writer = ArrowWriter::try_new(sink, schema, None)?;
Ok(Self { writer })
}
/// Write a RecordBatch to the Parquet file
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
Ok(())
}
/// Flush the current row group
pub fn flush(&mut self) -> Result<()> {
self.writer.flush()?;
Ok(())
}
/// Close the Parquet file, flushing any remaining data
pub fn close(self) -> Result<()> {
self.writer.close()?;
Ok(())
}
/// write the DistinctIndex to the Parquet file
pub fn write_index(&mut self, index: &DistinctIndex) -> Result<()> {
index.serialize(&mut self.writer)
}
}
/// Write a Parquet file with a single column "category" containing the
/// strings in `values` and a DistinctIndex for that column.
fn write_file_with_index(path: &Path, values: &[&str]) -> Result<()> {
// form an input RecordBatch with the string values
let field = Field::new("category", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field.clone()]));
let arr: ArrayRef = Arc::new(StringArray::from(values.to_vec()));
let batch = RecordBatch::try_new(schema.clone(), vec![arr])?;
// compute the distinct index
let distinct_index: DistinctIndex =
DistinctIndex::new(values.iter().map(|s| (*s).to_string()));
let file = File::create(path)?;
let mut writer = IndexedParquetWriter::try_new(file, schema.clone())?;
writer.write(&batch)?;
writer.flush()?;
writer.write_index(&distinct_index)?;
writer.close()?;
println!("Finished writing file to {}", path.display());
Ok(())
}
/// Read a `DistinctIndex` from a Parquet file
fn read_distinct_index(path: &Path) -> Result<DistinctIndex> {
let file = File::open(path)?;
let file_size = file.metadata()?.len();
println!("Reading index from {} (size: {file_size})", path.display(),);
let reader = SerializedFileReader::new(file.try_clone()?)?;
let meta = reader.metadata().file_metadata();
let offset = get_key_value(meta, "distinct_index_offset")
.ok_or_else(|| ParquetError::General("Missing index offset".into()))?
.parse::<u64>()
.map_err(|e| ParquetError::General(e.to_string()))?;
println!("Reading index at offset: {offset}, length");
DistinctIndex::new_from_reader(file, offset)
}
/// Returns the value of a named key from the Parquet file metadata
///
/// Returns None if the key is not found
fn get_key_value<'a>(file_meta_data: &'a FileMetaData, key: &'_ str) -> Option<&'a str> {
let kvs = file_meta_data.key_value_metadata()?;
let kv = kvs.iter().find(|kv| kv.key == key)?;
kv.value.as_deref()
}
/// Implement TableProvider for DistinctIndexTable, using the distinct index to prune files
#[async_trait]
impl TableProvider for DistinctIndexTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
/// Prune files before reading: only keep files whose distinct set
/// contains the filter value
async fn scan(
&self,
_ctx: &dyn Session,
_proj: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// This example only handles filters of the form
// `category = 'X'` where X is a string literal
//
// You can use `PruningPredicate` for much more general range and
// equality analysis or write your own custom logic.
let mut target: Option<&str> = None;
if filters.len() == 1 {
if let Expr::BinaryExpr(expr) = &filters[0] {
if expr.op == Operator::Eq {
if let (
Expr::Column(c),
Expr::Literal(ScalarValue::Utf8(Some(v)), _),
) = (&*expr.left, &*expr.right)
{
if c.name == "category" {
println!("Filtering for category: {v}");
target = Some(v);
}
}
}
}
}
// Determine which files to scan
let files_to_scan: Vec<_> = self
.files_and_index
.iter()
.filter_map(|(f, distinct_index)| {
// keep file if no target or target is in the distinct set
if target.is_none() || distinct_index.contains(target?) {
Some(f)
} else {
None
}
})
.collect();
println!("Scanning only files: {files_to_scan:?}");
// Build ParquetSource to actually read the files
let url = ObjectStoreUrl::parse("file://")?;
let source = Arc::new(ParquetSource::default().with_enable_page_index(true));
let mut builder = FileScanConfigBuilder::new(url, self.schema.clone(), source);
for file in files_to_scan {
let path = self.dir.join(file);
let len = std::fs::metadata(&path)?.len();
// If the index contained information about row groups or pages,
// you could also pass that information here to further prune
// the data read from the file.
let partitioned_file =
PartitionedFile::new(path.to_str().unwrap().to_string(), len);
builder = builder.with_file(partitioned_file);
}
Ok(DataSourceExec::from_data_source(builder.build()))
}
/// Tell DataFusion that we can handle filters on the "category" column
fn supports_filters_pushdown(
&self,
fs: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
// Mark as inexact since pruning is file‑granular
Ok(vec![TableProviderFilterPushDown::Inexact; fs.len()])
}
}
#[tokio::main]
async fn main() -> Result<()> {
// 1. Create temp dir and write 3 Parquet files with different category sets
let tmp = TempDir::new()?;
let dir = tmp.path();
write_file_with_index(&dir.join("a.parquet"), &["foo", "bar", "foo"])?;
write_file_with_index(&dir.join("b.parquet"), &["baz", "qux"])?;
write_file_with_index(&dir.join("c.parquet"), &["foo", "quux", "quux"])?;
// 2. Register our custom TableProvider
let field = Field::new("category", DataType::Utf8, false);
let schema_ref = Arc::new(Schema::new(vec![field]));
let provider = Arc::new(DistinctIndexTable::try_new(dir, schema_ref.clone())?);
let ctx = SessionContext::new();
ctx.register_table("t", provider)?;
// 3. Run a query: only files containing 'foo' get scanned. The rest are pruned.
// based on the distinct index.
let df = ctx.sql("SELECT * FROM t WHERE category = 'foo'").await?;
df.show().await?;
Ok(())
}