blob: f6860bb5b87a5a1729015b8a52d2bc751c451456 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::catalog::Session;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
ParquetAccessPlan, ParquetExecBuilder,
};
use datafusion::datasource::physical_plan::{
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
};
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
use datafusion::parquet::schema::types::ColumnPath;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;
/// This example demonstrates using low level DataFusion APIs to read only
/// certain row groups and ranges from parquet files, based on external
/// information.
///
/// Using these APIs, you can instruct DataFusion's parquet reader to skip
/// ("prune") portions of files that do not contain relevant data. These APIs
/// can be useful for doing low latency queries over a large number of Parquet
/// files on remote storage (e.g. S3) where the cost of reading the metadata for
/// each file is high (e.g. because it requires a network round trip to the
/// storage service).
///
/// Depending on the information from the index, DataFusion can make a request
/// to the storage service (e.g. S3) to read only the necessary data.
///
/// Note that this example uses a hard coded index implementation. For a more
/// realistic example of creating an index to prune files, see the
/// `parquet_index.rs` example.
///
/// Specifically, this example illustrates how to:
/// 1. Use [`ParquetFileReaderFactory`] to avoid re-reading parquet metadata on each query
/// 2. Use [`PruningPredicate`] for predicate analysis
/// 3. Pass a row group selection to [`ParuetExec`]
/// 4. Pass a row selection (within a row group) to [`ParquetExec`]
///
/// Note this is a *VERY* low level example for people who want to build their
/// own custom indexes (e.g. for low latency queries). Most users should use
/// higher level APIs for reading parquet files:
/// [`SessionContext::read_parquet`] or [`ListingTable`], which also do file
/// pruning based on parquet statistics (using the same underlying APIs)
///
/// # Diagram
///
/// This diagram shows how the `ParquetExec` is configured to do only a single
/// (range) read from a parquet file, for the data that is needed. It does
/// not read the file footer or any of the row groups that are not needed.
///
/// ```text
/// ┌───────────────────────┐ The TableProvider configures the
/// │ ┌───────────────────┐ │ ParquetExec:
/// │ │ │ │
/// │ └───────────────────┘ │
/// │ ┌───────────────────┐ │
/// Row │ │ │ │ 1. To read only specific Row
/// Groups │ └───────────────────┘ │ Groups (the ParquetExec tries
/// │ ┌───────────────────┐ │ to reduce this further based
/// │ │ │ │ on metadata)
/// │ └───────────────────┘ │ ┌────────────────────┐
/// │ ┌───────────────────┐ │ │ │
/// │ │ │◀┼ ─ ─ ┐ │ ParquetExec │
/// │ └───────────────────┘ │ │ (Parquet Reader) │
/// │ ... │ └ ─ ─ ─ ─│ │
/// │ ┌───────────────────┐ │ │ ╔═══════════════╗ │
/// │ │ │ │ │ ║ParquetMetadata║ │
/// │ └───────────────────┘ │ │ ╚═══════════════╝ │
/// │ ╔═══════════════════╗ │ └────────────────────┘
/// │ ║ Thrift metadata ║ │
/// │ ╚═══════════════════╝ │ 1. With cached ParquetMetadata, so
/// └───────────────────────┘ the ParquetExec does not re-read /
/// Parquet File decode the thrift footer
///
/// ```
///
/// Within a Row Group, Column Chunks store data in DataPages. This example also
/// shows how to configure the ParquetExec to read a `RowSelection` (row ranges)
/// which will skip unneeded data pages. This requires that the Parquet file has
/// a [Page Index].
///
/// ```text
/// ┌───────────────────────┐ If the RowSelection does not include any
/// │ ... │ rows from a particular Data Page, that
/// │ │ Data Page is not fetched or decoded.
/// │ ┌───────────────────┐ │ Note this requires a PageIndex
/// │ │ ┌──────────┐ │ │
/// Row │ │ │DataPage 0│ │ │ ┌────────────────────┐
/// Groups │ │ └──────────┘ │ │ │ │
/// │ │ ┌──────────┐ │ │ │ ParquetExec │
/// │ │ ... │DataPage 1│ ◀┼ ┼ ─ ─ ─ │ (Parquet Reader) │
/// │ │ └──────────┘ │ │ └ ─ ─ ─ ─ ─│ │
/// │ │ ┌──────────┐ │ │ │ ╔═══════════════╗ │
/// │ │ │DataPage 2│ │ │ If only rows │ ║ParquetMetadata║ │
/// │ │ └──────────┘ │ │ from DataPage 1 │ ╚═══════════════╝ │
/// │ └───────────────────┘ │ are selected, └────────────────────┘
/// │ │ only DataPage 1
/// │ ... │ is fetched and
/// │ │ decoded
/// │ ╔═══════════════════╗ │
/// │ ║ Thrift metadata ║ │
/// │ ╚═══════════════════╝ │
/// └───────────────────────┘
/// Parquet File
/// ```
///
/// [`ListingTable`]: datafusion::datasource::listing::ListingTable
/// [Page Index](https://github.com/apache/parquet-format/blob/master/PageIndex.md)
#[tokio::main]
async fn main() -> Result<()> {
// the object store is used to read the parquet files (in this case, it is
// a local file system, but in a real system it could be S3, GCS, etc)
let object_store: Arc<dyn ObjectStore> =
Arc::new(object_store::local::LocalFileSystem::new());
// Create a custom table provider with our special index.
let provider = Arc::new(IndexTableProvider::try_new(Arc::clone(&object_store))?);
// SessionContext for running queries that has the table provider
// registered as "index_table"
let ctx = SessionContext::new();
ctx.register_table("index_table", Arc::clone(&provider) as _)?;
// register object store provider for urls like `file://` work
let url = Url::try_from("file://").unwrap();
ctx.register_object_store(&url, object_store);
// Select data from the table without any predicates (and thus no pruning)
println!("** Select data, no predicates:");
ctx.sql("SELECT avg(id), max(text) FROM index_table")
.await?
.show()
.await?;
// the underlying parquet reader makes 10 IO requests, one for each row group
// Now, run a query that has a predicate that our index can handle
//
// For this query, the access plan specifies skipping 8 row groups
// and scanning 2 of them. The skipped row groups are not read at all:
//
// [Skip, Skip, Scan, Skip, Skip, Skip, Skip, Scan, Skip, Skip]
//
// Note that the parquet reader makes 2 IO requests - one for the data from
// each row group.
println!("** Select data, predicate `id IN (250, 750)`");
ctx.sql("SELECT text FROM index_table WHERE id IN (250, 750)")
.await?
.show()
.await?;
// Finally, demonstrate scanning sub ranges within the row groups.
// Parquet's minimum decode unit is a page, so specifying ranges
// within a row group can be used to skip pages within a row group.
//
// For this query, the access plan specifies skipping all but the last row
// group and within the last row group, reading only the row with id 950
//
// [Skip, Skip, Skip, Skip, Skip, Skip, Skip, Skip, Skip, Selection(skip 49, select 1, skip 50)]
//
// Note that the parquet reader makes a single IO request - for the data
// pages that must be decoded
//
// Note: in order to prune pages, the Page Index must be loaded and the
// ParquetExec will load it on demand if not present. To avoid a second IO
// during query, this example loaded the Page Index pre-emptively by setting
// `ArrowReader::with_page_index` in `IndexedFile::try_new`
provider.set_use_row_selection(true);
println!("** Select data, predicate `id = 950`");
ctx.sql("SELECT text FROM index_table WHERE id = 950")
.await?
.show()
.await?;
Ok(())
}
/// DataFusion `TableProvider` that uses knowledge of how data is distributed in
/// a file to prune row groups and rows from the file.
///
/// `file1.parquet` contains values `0..1000`
#[derive(Debug)]
pub struct IndexTableProvider {
/// Where the file is stored (cleanup on drop)
#[allow(dead_code)]
tmpdir: TempDir,
/// The file that is being read.
indexed_file: IndexedFile,
/// The underlying object store
object_store: Arc<dyn ObjectStore>,
/// if true, use row selections in addition to row group selections
use_row_selections: AtomicBool,
}
impl IndexTableProvider {
/// Create a new IndexTableProvider
/// * `object_store` - the object store implementation to use for reading files
pub fn try_new(object_store: Arc<dyn ObjectStore>) -> Result<Self> {
let tmpdir = TempDir::new().expect("Can't make temporary directory");
let indexed_file =
IndexedFile::try_new(tmpdir.path().join("indexed_file.parquet"), 0..1000)?;
Ok(Self {
indexed_file,
tmpdir,
object_store,
use_row_selections: AtomicBool::new(false),
})
}
/// set the value of use row selections
pub fn set_use_row_selection(&self, use_row_selections: bool) {
self.use_row_selections
.store(use_row_selections, Ordering::SeqCst);
}
/// return the value of use row selections
pub fn use_row_selections(&self) -> bool {
self.use_row_selections.load(Ordering::SeqCst)
}
/// convert filters like `a = 1`, `b = 2`
/// to a single predicate like `a = 1 AND b = 2` suitable for execution
fn filters_to_predicate(
&self,
state: &dyn Session,
filters: &[Expr],
) -> Result<Arc<dyn PhysicalExpr>> {
let df_schema = DFSchema::try_from(self.schema())?;
let predicate = conjunction(filters.to_vec());
let predicate = predicate
.map(|predicate| state.create_physical_expr(predicate, &df_schema))
.transpose()?
// if there are no filters, use a literal true to have a predicate
// that always evaluates to true we can pass to the index
.unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true));
Ok(predicate)
}
/// Returns a [`ParquetAccessPlan`] that specifies how to scan the
/// parquet file.
///
/// A `ParquetAccessPlan` specifies which row groups and which rows within
/// those row groups to scan.
fn create_plan(
&self,
predicate: &Arc<dyn PhysicalExpr>,
) -> Result<ParquetAccessPlan> {
// In this example, we use the PruningPredicate's literal guarantees to
// analyze the predicate. In a real system, using
// `PruningPredicate::prune` would likely be easier to do.
let pruning_predicate =
PruningPredicate::try_new(Arc::clone(predicate), self.schema())?;
// The PruningPredicate's guarantees must all be satisfied in order for
// the predicate to possibly evaluate to true.
let guarantees = pruning_predicate.literal_guarantees();
let Some(constants) = self.value_constants(guarantees) else {
return Ok(self.indexed_file.scan_all_plan());
};
// Begin with a plan that skips all row groups.
let mut plan = self.indexed_file.scan_none_plan();
// determine which row groups have the values in the guarantees
for value in constants {
let ScalarValue::Int32(Some(val)) = value else {
// if we have unexpected type of constant, no pruning is possible
return Ok(self.indexed_file.scan_all_plan());
};
// Since we know the values in the files are between 0..1000 and
// evenly distributed between in row groups, calculate in what row
// group this value appears and tell the parquet reader to read it
let val = *val as usize;
let num_rows_in_row_group = 1000 / plan.len();
let row_group_index = val / num_rows_in_row_group;
plan.scan(row_group_index);
// If we want to use row selections, which the parquet reader can
// use to skip data pages when the parquet file has a "page index"
// and the reader is configured to read it, add a row selection
if self.use_row_selections() {
let offset_in_row_group = val - row_group_index * num_rows_in_row_group;
let selection = RowSelection::from(vec![
// skip rows before the desired row
RowSelector::skip(offset_in_row_group.saturating_sub(1)),
// select the actual row
RowSelector::select(1),
// skip any remaining rows in the group
RowSelector::skip(num_rows_in_row_group - offset_in_row_group),
]);
plan.scan_selection(row_group_index, selection);
}
}
Ok(plan)
}
/// Returns the set of constants that the `"id"` column must take in order
/// for the predicate to be true.
///
/// If `None` is returned, we can't extract the necessary information from
/// the guarantees.
fn value_constants<'a>(
&self,
guarantees: &'a [LiteralGuarantee],
) -> Option<&'a HashSet<ScalarValue>> {
// only handle a single guarantee for column in this example
if guarantees.len() != 1 {
return None;
}
let guarantee = guarantees.first()?;
// Only handle IN guarantees for the "in" column
if guarantee.guarantee != Guarantee::In || guarantee.column.name() != "id" {
return None;
}
Some(&guarantee.literals)
}
}
/// Stores information needed to scan a file
#[derive(Debug)]
struct IndexedFile {
/// File name
file_name: String,
/// The path of the file
path: PathBuf,
/// The size of the file
file_size: u64,
/// The pre-parsed parquet metadata for the file
metadata: Arc<ParquetMetaData>,
/// The arrow schema of the file
schema: SchemaRef,
}
impl IndexedFile {
fn try_new(path: impl AsRef<Path>, value_range: Range<i32>) -> Result<Self> {
let path = path.as_ref();
// write the actual file
make_demo_file(path, value_range)?;
// Now, open the file and read its size and metadata
let file_name = path
.file_name()
.ok_or_else(|| internal_datafusion_err!("Invalid path"))?
.to_str()
.ok_or_else(|| internal_datafusion_err!("Invalid filename"))?
.to_string();
let file_size = path.metadata()?.len();
let file = File::open(path).map_err(|e| {
DataFusionError::from(e).context(format!("Error opening file {path:?}"))
})?;
let options = ArrowReaderOptions::new()
// Load the page index when reading metadata to cache
// so it is available to interpret row selections
.with_page_index(true);
let reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?;
let metadata = reader.metadata().clone();
let schema = reader.schema().clone();
// canonicalize after writing the file
let path = std::fs::canonicalize(path)?;
Ok(Self {
file_name,
path,
file_size,
metadata,
schema,
})
}
/// Return a `PartitionedFile` to scan the underlying file
///
/// The returned value does not have any `ParquetAccessPlan` specified in
/// its extensions.
fn partitioned_file(&self) -> PartitionedFile {
PartitionedFile::new(self.path.display().to_string(), self.file_size)
}
/// Return a `ParquetAccessPlan` that scans all row groups in the file
fn scan_all_plan(&self) -> ParquetAccessPlan {
ParquetAccessPlan::new_all(self.metadata.num_row_groups())
}
/// Return a `ParquetAccessPlan` that scans no row groups in the file
fn scan_none_plan(&self) -> ParquetAccessPlan {
ParquetAccessPlan::new_none(self.metadata.num_row_groups())
}
}
/// Implement the TableProvider trait for IndexTableProvider
/// so that we can query it as a table.
#[async_trait]
impl TableProvider for IndexTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.indexed_file.schema)
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let indexed_file = &self.indexed_file;
let predicate = self.filters_to_predicate(state, filters)?;
// Figure out which row groups to scan based on the predicate
let access_plan = self.create_plan(&predicate)?;
println!("{access_plan:?}");
let partitioned_file = indexed_file
.partitioned_file()
// provide the starting access plan to the ParquetExec by
// storing it as "extensions" on PartitionedFile
.with_extensions(Arc::new(access_plan) as _);
// Prepare for scanning
let schema = self.schema();
let object_store_url = ObjectStoreUrl::parse("file://")?;
let file_scan_config = FileScanConfig::new(object_store_url, schema)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file);
// Configure a factory interface to avoid re-reading the metadata for each file
let reader_factory =
CachedParquetFileReaderFactory::new(Arc::clone(&self.object_store))
.with_file(indexed_file);
// Finally, put it all together into a ParquetExec
Ok(ParquetExecBuilder::new(file_scan_config)
// provide the predicate so the ParquetExec can try and prune
// row groups internally
.with_predicate(predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory))
.build_arc())
}
/// Tell DataFusion to push filters down to the scan method
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
// Inexact because the pruning can't handle all expressions and pruning
// is not done at the row level -- there may be rows in returned files
// that do not pass the filter
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
}
/// A custom [`ParquetFileReaderFactory`] that handles opening parquet files
/// from object storage, and uses pre-loaded metadata.
#[derive(Debug)]
struct CachedParquetFileReaderFactory {
/// The underlying object store implementation for reading file data
object_store: Arc<dyn ObjectStore>,
/// The parquet metadata for each file in the index, keyed by the file name
/// (e.g. `file1.parquet`)
metadata: HashMap<String, Arc<ParquetMetaData>>,
}
impl CachedParquetFileReaderFactory {
fn new(object_store: Arc<dyn ObjectStore>) -> Self {
Self {
object_store,
metadata: HashMap::new(),
}
}
/// Add the pre-parsed information about the file to the factor
fn with_file(mut self, indexed_file: &IndexedFile) -> Self {
self.metadata.insert(
indexed_file.file_name.clone(),
Arc::clone(&indexed_file.metadata),
);
self
}
}
impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
fn create_reader(
&self,
_partition_index: usize,
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
_metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
// for this example we ignore the partition index and metrics
// but in a real system you would likely use them to report details on
// the performance of the reader.
let filename = file_meta
.location()
.parts()
.last()
.expect("No path in location")
.as_ref()
.to_string();
let object_store = Arc::clone(&self.object_store);
let mut inner = ParquetObjectReader::new(object_store, file_meta.object_meta);
if let Some(hint) = metadata_size_hint {
inner = inner.with_footer_size_hint(hint)
};
let metadata = self
.metadata
.get(&filename)
.expect("metadata for file not found: {filename}");
Ok(Box::new(ParquetReaderWithCache {
filename,
metadata: Arc::clone(metadata),
inner,
}))
}
}
/// wrapper around a ParquetObjectReader that caches metadata
struct ParquetReaderWithCache {
filename: String,
metadata: Arc<ParquetMetaData>,
inner: ParquetObjectReader,
}
impl AsyncFileReader for ParquetReaderWithCache {
fn get_bytes(
&mut self,
range: Range<usize>,
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Bytes>> {
println!("get_bytes: {} Reading range {:?}", self.filename, range);
self.inner.get_bytes(range)
}
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Vec<Bytes>>> {
println!(
"get_byte_ranges: {} Reading ranges {:?}",
self.filename, ranges
);
self.inner.get_byte_ranges(ranges)
}
fn get_metadata(
&mut self,
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Arc<ParquetMetaData>>> {
println!("get_metadata: {} returning cached metadata", self.filename);
// return the cached metadata so the parquet reader does not read it
let metadata = self.metadata.clone();
async move { Ok(metadata) }.boxed()
}
}
/// Creates a new parquet file at the specified path.
///
/// * id: Int32
/// * text: Utf8
///
/// The `id` column increases sequentially from `min_value` to `max_value`
/// The `text` column is a repeating sequence of `TheTextValue{i}`
///
/// Each row group has 100 rows
fn make_demo_file(path: impl AsRef<Path>, value_range: Range<i32>) -> Result<()> {
let path = path.as_ref();
let file = File::create(path)?;
let id = Int32Array::from_iter_values(value_range.clone());
let text =
StringArray::from_iter_values(value_range.map(|i| format!("TheTextValue{i}")));
let batch = RecordBatch::try_from_iter(vec![
("id", Arc::new(id) as ArrayRef),
("text", Arc::new(text) as ArrayRef),
])?;
let schema = batch.schema();
// enable page statistics for the tag column,
// for everything else.
let props = WriterProperties::builder()
.set_max_row_group_size(100)
// compute column chunk (per row group) statistics by default
.set_statistics_enabled(EnabledStatistics::Chunk)
// compute column page statistics for the tag column
.set_column_statistics_enabled(ColumnPath::from("tag"), EnabledStatistics::Page)
.build();
// write the actual values to the file
let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
writer.write(&batch)?;
writer.close()?;
Ok(())
}