blob: d6e17764442d58ac4c757d00d7a7ab00458f1d18 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::array::{
Array, ArrayRef, AsArray, BooleanArray, Int32Array, RecordBatch, StringArray,
UInt64Array,
};
use arrow::datatypes::Int32Type;
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use datafusion::parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
};
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::{utils::conjunction, TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::PhysicalExpr;
use std::any::Any;
use std::collections::HashSet;
use std::fmt::Display;
use std::fs::{self, DirEntry, File};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tempfile::TempDir;
use url::Url;
/// This example demonstrates building a secondary index over multiple Parquet
/// files and using that index during query to skip ("prune") files that do not
/// contain relevant data.
///
/// This example rules out relevant data using min/max values of a column
/// extracted from the Parquet metadata. In a real system, the index could be
/// more sophisticated, e.g. using inverted indices, bloom filters or other
/// techniques.
///
/// Note this is a low level example for people who want to build their own
/// custom indexes. To read a directory of parquet files as a table, you can use
/// a higher level API such as [`SessionContext::read_parquet`] or
/// [`ListingTable`], which also do file pruning based on parquet statistics
/// (using the same underlying APIs)
///
/// For a more advanced example of using an index to prune row groups within a
/// file, see the (forthcoming) `advanced_parquet_index` example.
///
/// # Diagram
///
/// ```text
/// ┏━━━━━━━━━━━━━━━━━━━━━━━━┓
/// ┃ Index ┃
/// ┃ ┃
/// step 1: predicate is ┌ ─ ─ ─ ─▶┃ (sometimes referred to ┃
/// evaluated against ┃ as a "catalog" or ┃
/// data in the index │ ┃ "metastore") ┃
/// (using ┗━━━━━━━━━━━━━━━━━━━━━━━━┛
/// PruningPredicate) │ │
///
/// │ │
/// ┌──────────────┐
/// │ value = 150 │─ ─ ─ ─ ┘ │
/// └──────────────┘ ┌─────────────┐
/// Predicate from query │ │ │
/// └─────────────┘
/// │ ┌─────────────┐
/// step 2: Index returns only ─ ▶│ │
/// parquet files that might have └─────────────┘
/// matching data. ...
/// ┌─────────────┐
/// Thus some parquet files are │ │
/// "pruned" and thus are not └─────────────┘
/// scanned at all Parquet Files
///
/// ```
///
/// [`ListingTable`]: datafusion::datasource::listing::ListingTable
#[tokio::main]
async fn main() -> Result<()> {
// Demo data has three files, each with schema
// * file_name (string)
// * value (int32)
//
// The files are as follows:
// * file1.parquet (value: 0..100)
// * file2.parquet (value: 100..200)
// * file3.parquet (value: 200..3000)
let data = DemoData::try_new()?;
// Create a table provider with and our special index.
let provider = Arc::new(IndexTableProvider::try_new(data.path())?);
println!("** Table Provider:");
println!("{provider}\n");
// Create a SessionContext for running queries that has the table provider
// registered as "index_table"
let ctx = SessionContext::new();
ctx.register_table("index_table", Arc::clone(&provider) as _)?;
// register object store provider for urls like `file://` work
let url = Url::try_from("file://").unwrap();
let object_store = object_store::local::LocalFileSystem::new();
ctx.register_object_store(&url, Arc::new(object_store));
// Select data from the table without any predicates (and thus no pruning)
println!("** Select data, no predicates:");
ctx.sql("SELECT file_name, value FROM index_table LIMIT 10")
.await?
.show()
.await?;
println!("Files pruned: {}\n", provider.index().last_num_pruned());
// Run a query that uses the index to prune files.
//
// Using the predicate "value = 150", the IndexTable can skip reading file 1
// (max value 100) and file 3 (min value of 200)
println!("** Select data, predicate `value = 150`");
ctx.sql("SELECT file_name, value FROM index_table WHERE value = 150")
.await?
.show()
.await?;
println!("Files pruned: {}\n", provider.index().last_num_pruned());
// likewise, we can use a more complicated predicate like
// "value < 20 OR value > 500" to read only file 1 and file 3
println!("** Select data, predicate `value < 20 OR value > 500`");
ctx.sql(
"SELECT file_name, count(value) FROM index_table \
WHERE value < 20 OR value > 500 GROUP BY file_name",
)
.await?
.show()
.await?;
println!("Files pruned: {}\n", provider.index().last_num_pruned());
Ok(())
}
/// DataFusion `TableProvider` that uses [`IndexTableProvider`], a secondary
/// index to decide which Parquet files to read.
#[derive(Debug)]
pub struct IndexTableProvider {
/// The index of the parquet files in the directory
index: ParquetMetadataIndex,
/// the directory in which the files are stored
dir: PathBuf,
}
impl Display for IndexTableProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "IndexTableProvider")?;
writeln!(f, "---- Index ----")?;
write!(f, "{}", self.index)
}
}
impl IndexTableProvider {
/// Create a new IndexTableProvider
pub fn try_new(dir: impl Into<PathBuf>) -> Result<Self> {
let dir = dir.into();
// Create an index of the parquet files in the directory as we see them.
let mut index_builder = ParquetMetadataIndexBuilder::new();
let files = read_dir(&dir)?;
for file in &files {
index_builder.add_file(&file.path())?;
}
let index = index_builder.build()?;
Ok(Self { index, dir })
}
/// return a reference to the underlying index
fn index(&self) -> &ParquetMetadataIndex {
&self.index
}
}
#[async_trait]
impl TableProvider for IndexTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.index.schema().clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let df_schema = DFSchema::try_from(self.schema())?;
// convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2`
let predicate = conjunction(filters.to_vec());
let predicate = predicate
.map(|predicate| state.create_physical_expr(predicate, &df_schema))
.transpose()?
// if there are no filters, use a literal true to have a predicate
// that always evaluates to true we can pass to the index
.unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true));
// Use the index to find the files that might have data that matches the
// predicate. Any file that can not have data that matches the predicate
// will not be returned.
let files = self.index.get_files(predicate.clone())?;
let object_store_url = ObjectStoreUrl::parse("file://")?;
let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
.with_projection(projection.cloned())
.with_limit(limit);
// Transform to the format needed to pass to ParquetExec
// Create one file group per file (default to scanning them all in parallel)
for (file_name, file_size) in files {
let path = self.dir.join(file_name);
let canonical_path = fs::canonicalize(path)?;
file_scan_config = file_scan_config.with_file(PartitionedFile::new(
canonical_path.display().to_string(),
file_size,
));
}
let exec = ParquetExec::builder(file_scan_config)
.with_predicate(predicate)
.build_arc();
Ok(exec)
}
/// Tell DataFusion to push filters down to the scan method
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
// Inexact because the pruning can't handle all expressions and pruning
// is not done at the row level -- there may be rows in returned files
// that do not pass the filter
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
}
/// Simple in memory secondary index for a set of parquet files
///
/// The index is represented as an arrow [`RecordBatch`] that can be passed
/// directly by the DataFusion [`PruningPredicate`] API
///
/// The `RecordBatch` looks as follows.
///
/// ```text
/// +---------------+-----------+-----------+------------------+------------------+
/// | file_name | file_size | row_count | value_column_min | value_column_max |
/// +---------------+-----------+-----------+------------------+------------------+
/// | file1.parquet | 6062 | 100 | 0 | 99 |
/// | file2.parquet | 6062 | 100 | 100 | 199 |
/// | file3.parquet | 163310 | 2800 | 200 | 2999 |
/// +---------------+-----------+-----------+------------------+------------------+
/// ```
///
/// It must store file_name and file_size to construct `PartitionedFile`.
///
/// Note a more advanced index might store finer grained information, such as information
/// about each row group within a file
#[derive(Debug)]
struct ParquetMetadataIndex {
file_schema: SchemaRef,
/// The index of the parquet files. See the struct level documentation for
/// the schema of this index.
index: RecordBatch,
/// The number of files that were pruned in the last query
last_num_pruned: AtomicUsize,
}
impl Display for ParquetMetadataIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(
f,
"ParquetMetadataIndex(last_num_pruned: {})",
self.last_num_pruned()
)?;
let batches = pretty_format_batches(&[self.index.clone()]).unwrap();
write!(f, "{batches}",)
}
}
impl ParquetMetadataIndex {
/// the schema of the *files* in the index (not the index's schema)
fn schema(&self) -> &SchemaRef {
&self.file_schema
}
/// number of files in the index
fn len(&self) -> usize {
self.index.num_rows()
}
/// Return a [`PartitionedFile`] for the specified file offset
///
/// For example, if the index batch contained data like
///
/// ```text
/// fileA
/// fileB
/// fileC
/// ```
///
/// `get_file(1)` would return `(fileB, size)`
fn get_file(&self, file_offset: usize) -> (&str, u64) {
// Filenames and sizes are always non null, so we don't have to check is_valid
let file_name = self.file_names().value(file_offset);
let file_size = self.file_size().value(file_offset);
(file_name, file_size)
}
/// Return the number of files that were pruned in the last query
pub fn last_num_pruned(&self) -> usize {
self.last_num_pruned.load(Ordering::SeqCst)
}
/// Set the number of files that were pruned in the last query
fn set_last_num_pruned(&self, num_pruned: usize) {
self.last_num_pruned.store(num_pruned, Ordering::SeqCst);
}
/// Return all the files matching the predicate
///
/// Returns a tuple `(file_name, file_size)`
pub fn get_files(
&self,
predicate: Arc<dyn PhysicalExpr>,
) -> Result<Vec<(&str, u64)>> {
// Use the PruningPredicate API to determine which files can not
// possibly have any relevant data.
let pruning_predicate =
PruningPredicate::try_new(predicate, self.schema().clone())?;
// Now evaluate the pruning predicate into a boolean mask, one element per
// file in the index. If the mask is true, the file may have rows that
// match the predicate. If the mask is false, we know the file can not have *any*
// rows that match the predicate and thus can be skipped.
let file_mask = pruning_predicate.prune(self)?;
let num_left = file_mask.iter().filter(|x| **x).count();
self.set_last_num_pruned(self.len() - num_left);
// Return only files that match the predicate from the index
let files_and_sizes: Vec<_> = file_mask
.into_iter()
.enumerate()
.filter_map(|(file, keep)| {
if keep {
Some(self.get_file(file))
} else {
None
}
})
.collect();
Ok(files_and_sizes)
}
/// Return the file_names column of this index
fn file_names(&self) -> &StringArray {
self.index
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
}
/// Return the file_size column of this index
fn file_size(&self) -> &UInt64Array {
self.index
.column(1)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
}
/// Reference to the row count column
fn row_counts_ref(&self) -> &ArrayRef {
self.index.column(2)
}
/// Reference to the column minimum values
fn value_column_mins(&self) -> &ArrayRef {
self.index.column(3)
}
/// Reference to the column maximum values
fn value_column_maxes(&self) -> &ArrayRef {
self.index.column(4)
}
}
/// In order to use the PruningPredicate API, we need to provide DataFusion
/// the required statistics via the [`PruningStatistics`] trait
impl PruningStatistics for ParquetMetadataIndex {
/// return the minimum values for the value column
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
if column.name.eq("value") {
Some(self.value_column_mins().clone())
} else {
None
}
}
/// return the maximum values for the value column
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
if column.name.eq("value") {
Some(self.value_column_maxes().clone())
} else {
None
}
}
/// return the number of "containers". In this example, each "container" is
/// a file (aka a row in the index)
fn num_containers(&self) -> usize {
self.len()
}
/// Return `None` to signal we don't have any information about null
/// counts in the index,
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}
/// return the row counts for each file
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
Some(self.row_counts_ref().clone())
}
/// The `contained` API can be used with structures such as Bloom filters,
/// but is not used in this example, so return `None`
fn contained(
&self,
_column: &Column,
_values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
None
}
}
/// Builds a [`ParquetMetadataIndex`] from a set of parquet files
#[derive(Debug, Default)]
struct ParquetMetadataIndexBuilder {
file_schema: Option<SchemaRef>,
filenames: Vec<String>,
file_sizes: Vec<u64>,
row_counts: Vec<u64>,
/// Holds the min/max value of the value column for each file
value_column_mins: Vec<i32>,
value_column_maxs: Vec<i32>,
}
impl ParquetMetadataIndexBuilder {
fn new() -> Self {
Self::default()
}
/// Add a new file to the index
fn add_file(&mut self, file: &Path) -> Result<()> {
let file_name = file
.file_name()
.ok_or_else(|| internal_datafusion_err!("No filename"))?
.to_str()
.ok_or_else(|| internal_datafusion_err!("Invalid filename"))?;
let file_size = file.metadata()?.len();
let file = File::open(file).map_err(|e| {
DataFusionError::from(e).context(format!("Error opening file {file:?}"))
})?;
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?;
// Get the schema of the file. A real system might have to handle the
// case where the schema of the file is not the same as the schema of
// the other files e.g. using SchemaAdapter.
if self.file_schema.is_none() {
self.file_schema = Some(reader.schema().clone());
}
// extract the parquet statistics from the file's footer
let metadata = reader.metadata();
let row_groups = metadata.row_groups();
// Extract the min/max values for each row group from the statistics
let converter = StatisticsConverter::try_new(
"value",
reader.schema(),
reader.parquet_schema(),
)?;
let row_counts = converter
.row_group_row_counts(row_groups.iter())?
.ok_or_else(|| {
internal_datafusion_err!("Row group row counts are missing")
})?;
let value_column_mins = converter.row_group_mins(row_groups.iter())?;
let value_column_maxes = converter.row_group_maxes(row_groups.iter())?;
// In a real system you would have to handle nulls, which represent
// unknown statistics. All statistics are known in this example
assert_eq!(row_counts.null_count(), 0);
assert_eq!(value_column_mins.null_count(), 0);
assert_eq!(value_column_maxes.null_count(), 0);
// The statistics gathered above are for each row group. We need to
// aggregate them together to compute the overall file row count,
// min and max.
let row_count = row_counts
.iter()
.flatten() // skip nulls (should be none)
.sum::<u64>();
let value_column_min = value_column_mins
.as_primitive::<Int32Type>()
.iter()
.flatten() // skip nulls (i.e. min is unknown)
.min()
.unwrap_or_default();
let value_column_max = value_column_maxes
.as_primitive::<Int32Type>()
.iter()
.flatten() // skip nulls (i.e. max is unknown)
.max()
.unwrap_or_default();
// sanity check the statistics
assert_eq!(row_count, metadata.file_metadata().num_rows() as u64);
self.add_row(
file_name,
file_size,
row_count,
value_column_min,
value_column_max,
);
Ok(())
}
/// Add an entry for a single new file to the in progress index
fn add_row(
&mut self,
file_name: impl Into<String>,
file_size: u64,
row_count: u64,
value_column_min: i32,
value_column_max: i32,
) {
self.filenames.push(file_name.into());
self.file_sizes.push(file_size);
self.row_counts.push(row_count);
self.value_column_mins.push(value_column_min);
self.value_column_maxs.push(value_column_max);
}
/// Build the index from the files added
fn build(self) -> Result<ParquetMetadataIndex> {
let Some(file_schema) = self.file_schema else {
return Err(internal_datafusion_err!("No files added to index"));
};
let file_name: ArrayRef = Arc::new(StringArray::from(self.filenames));
let file_size: ArrayRef = Arc::new(UInt64Array::from(self.file_sizes));
let row_count: ArrayRef = Arc::new(UInt64Array::from(self.row_counts));
let value_column_min: ArrayRef =
Arc::new(Int32Array::from(self.value_column_mins));
let value_column_max: ArrayRef =
Arc::new(Int32Array::from(self.value_column_maxs));
let index = RecordBatch::try_from_iter(vec![
("file_name", file_name),
("file_size", file_size),
("row_count", row_count),
("value_column_min", value_column_min),
("value_column_max", value_column_max),
])?;
Ok(ParquetMetadataIndex {
file_schema,
index,
last_num_pruned: AtomicUsize::new(0),
})
}
}
/// Return a list of the directory entries in the given directory, sorted by name
fn read_dir(dir: &Path) -> Result<Vec<DirEntry>> {
let mut files = dir
.read_dir()
.map_err(|e| {
DataFusionError::from(e).context(format!("Error reading directory {dir:?}"))
})?
.map(|entry| {
entry.map_err(|e| {
DataFusionError::from(e)
.context(format!("Error reading directory entry in {dir:?}"))
})
})
.collect::<Result<Vec<DirEntry>>>()?;
files.sort_by_key(|entry| entry.file_name());
Ok(files)
}
/// Demonstration Data
///
/// Makes a directory with three parquet files
///
/// The schema of the files is
/// * file_name (string)
/// * value (int32)
///
/// The files are as follows:
/// * file1.parquet (values 0..100)
/// * file2.parquet (values 100..200)
/// * file3.parquet (values 200..3000)
struct DemoData {
tmpdir: TempDir,
}
impl DemoData {
fn try_new() -> Result<Self> {
let tmpdir = TempDir::new()?;
make_demo_file(tmpdir.path().join("file1.parquet"), 0..100)?;
make_demo_file(tmpdir.path().join("file2.parquet"), 100..200)?;
make_demo_file(tmpdir.path().join("file3.parquet"), 200..3000)?;
Ok(Self { tmpdir })
}
fn path(&self) -> PathBuf {
self.tmpdir.path().into()
}
}
/// Creates a new parquet file at the specified path.
///
/// The `value` column increases sequentially from `min_value` to `max_value`
/// with the following schema:
///
/// * file_name: Utf8
/// * value: Int32
fn make_demo_file(path: impl AsRef<Path>, value_range: Range<i32>) -> Result<()> {
let path = path.as_ref();
let file = File::create(path)?;
let filename = path
.file_name()
.ok_or_else(|| internal_datafusion_err!("No filename"))?
.to_str()
.ok_or_else(|| internal_datafusion_err!("Invalid filename"))?;
let num_values = value_range.len();
let file_names =
StringArray::from_iter_values(std::iter::repeat(&filename).take(num_values));
let values = Int32Array::from_iter_values(value_range);
let batch = RecordBatch::try_from_iter(vec![
("file_name", Arc::new(file_names) as ArrayRef),
("value", Arc::new(values) as ArrayRef),
])?;
let schema = batch.schema();
// write the actual values to the file
let props = None;
let mut writer = ArrowWriter::try_new(file, schema, props)?;
writer.write(&batch)?;
writer.finish()?;
Ok(())
}