blob: 7d5c8c4834eadc34e9e389b058d2bc31e4a1db14 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::any::Any;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::fmt;
use std::fmt::Debug;
use std::ops::Deref;
use std::slice::from_ref;
use std::sync::Arc;
use crate::sink::DataSink;
use crate::source::{DataSource, DataSourceExec};
use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::{Schema, SchemaRef};
use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::project_orderings;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::memory::MemoryStream;
use datafusion_physical_plan::projection::{
all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
};
use datafusion_physical_plan::{
common, ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
SendableRecordBatchStream, Statistics,
};
use async_trait::async_trait;
use datafusion_physical_plan::coop::cooperative;
use datafusion_physical_plan::execution_plan::SchedulingType;
use futures::StreamExt;
use itertools::Itertools;
use tokio::sync::RwLock;
/// Data source configuration for reading in-memory batches of data
#[derive(Clone, Debug)]
pub struct MemorySourceConfig {
/// The partitions to query.
///
/// Each partition is a `Vec<RecordBatch>`.
partitions: Vec<Vec<RecordBatch>>,
/// Schema representing the data before projection
schema: SchemaRef,
/// Schema representing the data after the optional projection is applied
projected_schema: SchemaRef,
/// Optional projection
projection: Option<Vec<usize>>,
/// Sort information: one or more equivalent orderings
sort_information: Vec<LexOrdering>,
/// if partition sizes should be displayed
show_sizes: bool,
/// The maximum number of records to read from this plan. If `None`,
/// all records after filtering are returned.
fetch: Option<usize>,
}
impl DataSource for MemorySourceConfig {
fn open(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(cooperative(
MemoryStream::try_new(
self.partitions[partition].clone(),
Arc::clone(&self.projected_schema),
self.projection.clone(),
)?
.with_fetch(self.fetch),
)))
}
fn as_any(&self) -> &dyn Any {
self
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let partition_sizes: Vec<_> =
self.partitions.iter().map(|b| b.len()).collect();
let output_ordering = self
.sort_information
.first()
.map(|output_ordering| format!(", output_ordering={output_ordering}"))
.unwrap_or_default();
let eq_properties = self.eq_properties();
let constraints = eq_properties.constraints();
let constraints = if constraints.is_empty() {
String::new()
} else {
format!(", {constraints}")
};
let limit = self
.fetch
.map_or(String::new(), |limit| format!(", fetch={limit}"));
if self.show_sizes {
write!(
f,
"partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
partition_sizes.len(),
)
} else {
write!(
f,
"partitions={}{limit}{output_ordering}{constraints}",
partition_sizes.len(),
)
}
}
DisplayFormatType::TreeRender => {
let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
let total_bytes: usize = self
.partitions
.iter()
.flatten()
.map(|batch| batch.get_array_memory_size())
.sum();
writeln!(f, "format=memory")?;
writeln!(f, "rows={total_rows}")?;
writeln!(f, "bytes={total_bytes}")?;
Ok(())
}
}
}
/// If possible, redistribute batches across partitions according to their size.
///
/// Returns `Ok(None)` if unable to repartition. Preserve output ordering if exists.
/// Refer to [`DataSource::repartitioned`] for further details.
fn repartitioned(
&self,
target_partitions: usize,
_repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
if self.partitions.is_empty() || self.partitions.len() >= target_partitions
// if have no partitions, or already have more partitions than desired, do not repartition
{
return Ok(None);
}
let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
self.repartition_preserving_order(target_partitions, output_ordering)?
} else {
self.repartition_evenly_by_size(target_partitions)?
};
if let Some(repartitioned) = maybe_repartitioned {
Ok(Some(Arc::new(Self::try_new(
&repartitioned,
self.original_schema(),
self.projection.clone(),
)?)))
} else {
Ok(None)
}
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partitions.len())
}
fn eq_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
Arc::clone(&self.projected_schema),
self.sort_information.clone(),
)
}
fn scheduling_type(&self) -> SchedulingType {
SchedulingType::Cooperative
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if let Some(partition) = partition {
// Compute statistics for a specific partition
if let Some(batches) = self.partitions.get(partition) {
Ok(common::compute_record_batch_statistics(
from_ref(batches),
&self.schema,
self.projection.clone(),
))
} else {
// Invalid partition index
Ok(Statistics::new_unknown(&self.projected_schema))
}
} else {
// Compute statistics across all partitions
Ok(common::compute_record_batch_statistics(
&self.partitions,
&self.schema,
self.projection.clone(),
))
}
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
let source = self.clone();
Some(Arc::new(source.with_limit(limit)))
}
fn fetch(&self) -> Option<usize> {
self.fetch
}
fn try_swapping_with_projection(
&self,
projection: &[ProjectionExpr],
) -> Result<Option<Arc<dyn DataSource>>> {
// If there is any non-column or alias-carrier expression, Projection should not be removed.
// This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
all_alias_free_columns(projection)
.then(|| {
let all_projections = (0..self.schema.fields().len()).collect();
let new_projections = new_projections_for_columns(
projection,
self.projection().as_ref().unwrap_or(&all_projections),
);
MemorySourceConfig::try_new(
self.partitions(),
self.original_schema(),
Some(new_projections),
)
.map(|s| Arc::new(s) as Arc<dyn DataSource>)
})
.transpose()
}
}
impl MemorySourceConfig {
/// Create a new `MemorySourceConfig` for reading in-memory record batches
/// The provided `schema` should not have the projection applied.
pub fn try_new(
partitions: &[Vec<RecordBatch>],
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
let projected_schema = project_schema(&schema, projection.as_ref())?;
Ok(Self {
partitions: partitions.to_vec(),
schema,
projected_schema,
projection,
sort_information: vec![],
show_sizes: true,
fetch: None,
})
}
/// Create a new `DataSourceExec` plan for reading in-memory record batches
/// The provided `schema` should not have the projection applied.
pub fn try_new_exec(
partitions: &[Vec<RecordBatch>],
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Arc<DataSourceExec>> {
let source = Self::try_new(partitions, schema, projection)?;
Ok(DataSourceExec::from_data_source(source))
}
/// Create a new execution plan from a list of constant values (`ValuesExec`)
pub fn try_new_as_values(
schema: SchemaRef,
data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
) -> Result<Arc<DataSourceExec>> {
if data.is_empty() {
return plan_err!("Values list cannot be empty");
}
let n_row = data.len();
let n_col = schema.fields().len();
// We have this single row batch as a placeholder to satisfy evaluation argument
// and generate a single output row
let placeholder_schema = Arc::new(Schema::empty());
let placeholder_batch = RecordBatch::try_new_with_options(
Arc::clone(&placeholder_schema),
vec![],
&RecordBatchOptions::new().with_row_count(Some(1)),
)?;
// Evaluate each column
let arrays = (0..n_col)
.map(|j| {
(0..n_row)
.map(|i| {
let expr = &data[i][j];
let result = expr.evaluate(&placeholder_batch)?;
match result {
ColumnarValue::Scalar(scalar) => Ok(scalar),
ColumnarValue::Array(array) if array.len() == 1 => {
ScalarValue::try_from_array(&array, 0)
}
ColumnarValue::Array(_) => {
plan_err!("Cannot have array values in a values list")
}
}
})
.collect::<Result<Vec<_>>>()
.and_then(ScalarValue::iter_to_array)
})
.collect::<Result<Vec<_>>>()?;
let batch = RecordBatch::try_new_with_options(
Arc::clone(&schema),
arrays,
&RecordBatchOptions::new().with_row_count(Some(n_row)),
)?;
let partitions = vec![batch];
Self::try_new_from_batches(Arc::clone(&schema), partitions)
}
/// Create a new plan using the provided schema and batches.
///
/// Errors if any of the batches don't match the provided schema, or if no
/// batches are provided.
pub fn try_new_from_batches(
schema: SchemaRef,
batches: Vec<RecordBatch>,
) -> Result<Arc<DataSourceExec>> {
if batches.is_empty() {
return plan_err!("Values list cannot be empty");
}
for batch in &batches {
let batch_schema = batch.schema();
if batch_schema != schema {
return plan_err!(
"Batch has invalid schema. Expected: {}, got: {}",
schema,
batch_schema
);
}
}
let partitions = vec![batches];
let source = Self {
partitions,
schema: Arc::clone(&schema),
projected_schema: Arc::clone(&schema),
projection: None,
sort_information: vec![],
show_sizes: true,
fetch: None,
};
Ok(DataSourceExec::from_data_source(source))
}
/// Set the limit of the files
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.fetch = limit;
self
}
/// Set `show_sizes` to determine whether to display partition sizes
pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
self.show_sizes = show_sizes;
self
}
/// Ref to partitions
pub fn partitions(&self) -> &[Vec<RecordBatch>] {
&self.partitions
}
/// Ref to projection
pub fn projection(&self) -> &Option<Vec<usize>> {
&self.projection
}
/// Show sizes
pub fn show_sizes(&self) -> bool {
self.show_sizes
}
/// Ref to sort information
pub fn sort_information(&self) -> &[LexOrdering] {
&self.sort_information
}
/// A memory table can be ordered by multiple expressions simultaneously.
/// [`EquivalenceProperties`] keeps track of expressions that describe the
/// global ordering of the schema. These columns are not necessarily same; e.g.
/// ```text
/// ┌-------┐
/// | a | b |
/// |---|---|
/// | 1 | 9 |
/// | 2 | 8 |
/// | 3 | 7 |
/// | 5 | 5 |
/// └---┴---┘
/// ```
/// where both `a ASC` and `b DESC` can describe the table ordering. With
/// [`EquivalenceProperties`], we can keep track of these equivalences
/// and treat `a ASC` and `b DESC` as the same ordering requirement.
///
/// Note that if there is an internal projection, that projection will be
/// also applied to the given `sort_information`.
pub fn try_with_sort_information(
mut self,
mut sort_information: Vec<LexOrdering>,
) -> Result<Self> {
// All sort expressions must refer to the original schema
let fields = self.schema.fields();
let ambiguous_column = sort_information
.iter()
.flat_map(|ordering| ordering.clone())
.flat_map(|expr| collect_columns(&expr.expr))
.find(|col| {
fields
.get(col.index())
.map(|field| field.name() != col.name())
.unwrap_or(true)
});
if let Some(col) = ambiguous_column {
return internal_err!(
"Column {:?} is not found in the original schema of the MemorySourceConfig",
col
);
}
// If there is a projection on the source, we also need to project orderings
if self.projection.is_some() {
sort_information =
project_orderings(&sort_information, &self.projected_schema);
}
self.sort_information = sort_information;
Ok(self)
}
/// Arc clone of ref to original schema
pub fn original_schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
/// Repartition while preserving order.
///
/// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
/// as having too few batches to fulfill the `target_partitions` or if unable
/// to preserve output ordering.
fn repartition_preserving_order(
&self,
target_partitions: usize,
output_ordering: LexOrdering,
) -> Result<Option<Vec<Vec<RecordBatch>>>> {
if !self.eq_properties().ordering_satisfy(output_ordering)? {
Ok(None)
} else {
let total_num_batches =
self.partitions.iter().map(|b| b.len()).sum::<usize>();
if total_num_batches < target_partitions {
// no way to create the desired repartitioning
return Ok(None);
}
let cnt_to_repartition = target_partitions - self.partitions.len();
// Label the current partitions and their order.
// Such that when we later split up the partitions into smaller sizes, we are maintaining the order.
let to_repartition = self
.partitions
.iter()
.enumerate()
.map(|(idx, batches)| RePartition {
idx: idx + (cnt_to_repartition * idx), // make space in ordering for split partitions
row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
batches: batches.clone(),
})
.collect_vec();
// Put all of the partitions into a heap ordered by `RePartition::partial_cmp`, which sizes
// by count of rows.
let mut max_heap = BinaryHeap::with_capacity(target_partitions);
for rep in to_repartition {
max_heap.push(CompareByRowCount(rep));
}
// Split the largest partitions into smaller partitions. Maintaining the output
// order of the partitions & newly created partitions.
let mut cannot_split_further = Vec::with_capacity(target_partitions);
for _ in 0..cnt_to_repartition {
// triggers loop for the cnt_to_repartition. So if need another 4 partitions, it attempts to split 4 times.
loop {
// Take the largest item off the heap, and attempt to split.
let Some(to_split) = max_heap.pop() else {
// Nothing left to attempt repartition. Break inner loop.
break;
};
// Split the partition. The new partitions will be ordered with idx and idx+1.
let mut new_partitions = to_split.into_inner().split();
if new_partitions.len() > 1 {
for new_partition in new_partitions {
max_heap.push(CompareByRowCount(new_partition));
}
// Successful repartition. Break inner loop, and return to outer `cnt_to_repartition` loop.
break;
} else {
cannot_split_further.push(new_partitions.remove(0));
}
}
}
let mut partitions = max_heap
.drain()
.map(CompareByRowCount::into_inner)
.collect_vec();
partitions.extend(cannot_split_further);
// Finally, sort all partitions by the output ordering.
// This was the original ordering of the batches within the partition. We are maintaining this ordering.
partitions.sort_by_key(|p| p.idx);
let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
Ok(Some(partitions))
}
}
/// Repartition into evenly sized chunks (as much as possible without batch splitting),
/// disregarding any ordering.
///
/// Current implementation uses a first-fit-decreasing bin packing, modified to enable
/// us to still return the desired count of `target_partitions`.
///
/// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
/// as having too few batches to fulfill the `target_partitions`.
fn repartition_evenly_by_size(
&self,
target_partitions: usize,
) -> Result<Option<Vec<Vec<RecordBatch>>>> {
// determine if we have enough total batches to fulfill request
let mut flatten_batches =
self.partitions.clone().into_iter().flatten().collect_vec();
if flatten_batches.len() < target_partitions {
return Ok(None);
}
// Take all flattened batches (all in 1 partititon/vec) and divide evenly into the desired number of `target_partitions`.
let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
// sort by size, so we pack multiple smaller batches into the same partition
flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
// Divide.
let mut partitions =
vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
let mut total_rows_seen = 0;
let mut curr_bin_row_count = 0;
let mut idx = 0;
for batch in flatten_batches {
let row_cnt = batch.num_rows();
idx = std::cmp::min(idx, target_partitions - 1);
partitions[idx].push(batch);
curr_bin_row_count += row_cnt;
total_rows_seen += row_cnt;
if curr_bin_row_count >= target_partition_size {
idx += 1;
curr_bin_row_count = 0;
// update target_partition_size, to handle very lopsided batch distributions
// while still returning the count of `target_partitions`
if total_rows_seen < total_num_rows {
target_partition_size = (total_num_rows - total_rows_seen)
.div_ceil(target_partitions - idx);
}
}
}
Ok(Some(partitions))
}
}
/// For use in repartitioning, track the total size and original partition index.
///
/// Do not implement clone, in order to avoid unnecessary copying during repartitioning.
struct RePartition {
/// Original output ordering for the partition.
idx: usize,
/// Total size of the partition, for use in heap ordering
/// (a.k.a. splitting up the largest partitions).
row_count: usize,
/// A partition containing record batches.
batches: Vec<RecordBatch>,
}
impl RePartition {
/// Split [`RePartition`] into 2 pieces, consuming self.
///
/// Returns only 1 partition if cannot be split further.
fn split(self) -> Vec<Self> {
if self.batches.len() == 1 {
return vec![self];
}
let new_0 = RePartition {
idx: self.idx, // output ordering
row_count: 0,
batches: vec![],
};
let new_1 = RePartition {
idx: self.idx + 1, // output ordering +1
row_count: 0,
batches: vec![],
};
let split_pt = self.row_count / 2;
let [new_0, new_1] = self.batches.into_iter().fold(
[new_0, new_1],
|[mut new0, mut new1], batch| {
if new0.row_count < split_pt {
new0.add_batch(batch);
} else {
new1.add_batch(batch);
}
[new0, new1]
},
);
vec![new_0, new_1]
}
fn add_batch(&mut self, batch: RecordBatch) {
self.row_count += batch.num_rows();
self.batches.push(batch);
}
}
impl fmt::Display for RePartition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}rows-in-{}batches@{}",
self.row_count,
self.batches.len(),
self.idx
)
}
}
struct CompareByRowCount(RePartition);
impl CompareByRowCount {
fn into_inner(self) -> RePartition {
self.0
}
}
impl Ord for CompareByRowCount {
fn cmp(&self, other: &Self) -> Ordering {
self.0.row_count.cmp(&other.0.row_count)
}
}
impl PartialOrd for CompareByRowCount {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for CompareByRowCount {
fn eq(&self, other: &Self) -> bool {
// PartialEq must be consistent with PartialOrd
self.cmp(other) == Ordering::Equal
}
}
impl Eq for CompareByRowCount {}
impl Deref for CompareByRowCount {
type Target = RePartition;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// Type alias for partition data
pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
/// Implements for writing to a [`MemTable`]
///
/// [`MemTable`]: <https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html>
pub struct MemSink {
/// Target locations for writing data
batches: Vec<PartitionData>,
schema: SchemaRef,
}
impl Debug for MemSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MemSink")
.field("num_partitions", &self.batches.len())
.finish()
}
}
impl DisplayAs for MemSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let partition_count = self.batches.len();
write!(f, "MemoryTable (partitions={partition_count})")
}
DisplayFormatType::TreeRender => {
// TODO: collect info
write!(f, "")
}
}
}
}
impl MemSink {
/// Creates a new [`MemSink`].
///
/// The caller is responsible for ensuring that there is at least one partition to insert into.
pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
if batches.is_empty() {
return plan_err!("Cannot insert into MemTable with zero partitions");
}
Ok(Self { batches, schema })
}
}
#[async_trait]
impl DataSink for MemSink {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> &SchemaRef {
&self.schema
}
async fn write_all(
&self,
mut data: SendableRecordBatchStream,
_context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = self.batches.len();
// buffer up the data round robin style into num_partitions
let mut new_batches = vec![vec![]; num_partitions];
let mut i = 0;
let mut row_count = 0;
while let Some(batch) = data.next().await.transpose()? {
row_count += batch.num_rows();
new_batches[i].push(batch);
i = (i + 1) % num_partitions;
}
// write the outputs into the batches
for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
// Append all the new batches in one go to minimize locking overhead
target.write().await.append(&mut batches);
}
Ok(row_count as u64)
}
}
#[cfg(test)]
mod memory_source_tests {
use std::sync::Arc;
use crate::memory::MemorySourceConfig;
use crate::source::DataSourceExec;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::ExecutionPlan;
#[test]
fn test_memory_order_eq() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
Field::new("c", DataType::Int64, false),
]));
let sort1: LexOrdering = [
PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: col("b", &schema)?,
options: SortOptions::default(),
},
]
.into();
let sort2: LexOrdering = [PhysicalSortExpr {
expr: col("c", &schema)?,
options: SortOptions::default(),
}]
.into();
let mut expected_output_order = sort1.clone();
expected_output_order.extend(sort2.clone());
let sort_information = vec![sort1.clone(), sort2.clone()];
let mem_exec = DataSourceExec::from_data_source(
MemorySourceConfig::try_new(&[vec![]], schema, None)?
.try_with_sort_information(sort_information)?,
);
assert_eq!(
mem_exec.properties().output_ordering().unwrap(),
&expected_output_order
);
let eq_properties = mem_exec.properties().equivalence_properties();
assert!(eq_properties.oeq_class().contains(&sort1));
assert!(eq_properties.oeq_class().contains(&sort2));
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::col;
use crate::tests::{aggr_test_schema, make_partition};
use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field};
use datafusion_common::assert_batches_eq;
use datafusion_common::stats::{ColumnStatistics, Precision};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::expressions::lit;
use datafusion_physical_plan::ExecutionPlan;
use futures::StreamExt;
#[tokio::test]
async fn exec_with_limit() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let batch = make_partition(7);
let schema = batch.schema();
let batches = vec![batch.clone(), batch];
let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
assert_eq!(exec.fetch(), None);
let exec = exec.with_fetch(Some(4)).unwrap();
assert_eq!(exec.fetch(), Some(4));
let mut it = exec.execute(0, task_ctx)?;
let mut results = vec![];
while let Some(batch) = it.next().await {
results.push(batch?);
}
let expected = [
"+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
];
assert_batches_eq!(expected, &results);
Ok(())
}
#[tokio::test]
async fn values_empty_case() -> Result<()> {
let schema = aggr_test_schema();
let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
assert!(empty.is_err());
Ok(())
}
#[test]
fn new_exec_with_batches() {
let batch = make_partition(7);
let schema = batch.schema();
let batches = vec![batch.clone(), batch];
let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
}
#[test]
fn new_exec_with_batches_empty() {
let batch = make_partition(7);
let schema = batch.schema();
let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
}
#[test]
fn new_exec_with_batches_invalid_schema() {
let batch = make_partition(7);
let batches = vec![batch.clone(), batch];
let invalid_schema = Arc::new(Schema::new(vec![
Field::new("col0", DataType::UInt32, false),
Field::new("col1", DataType::Utf8, false),
]));
let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
.unwrap_err();
}
// Test issue: https://github.com/apache/datafusion/issues/8763
#[test]
fn new_exec_with_non_nullable_schema() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col0",
DataType::UInt32,
false,
)]));
let _ = MemorySourceConfig::try_new_as_values(
Arc::clone(&schema),
vec![vec![lit(1u32)]],
)
.unwrap();
// Test that a null value is rejected
let _ = MemorySourceConfig::try_new_as_values(
schema,
vec![vec![lit(ScalarValue::UInt32(None))]],
)
.unwrap_err();
}
#[test]
fn values_stats_with_nulls_only() -> Result<()> {
let data = vec![
vec![lit(ScalarValue::Null)],
vec![lit(ScalarValue::Null)],
vec![lit(ScalarValue::Null)],
];
let rows = data.len();
let values = MemorySourceConfig::try_new_as_values(
Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
data,
)?;
assert_eq!(
values.partition_statistics(None)?,
Statistics {
num_rows: Precision::Exact(rows),
total_byte_size: Precision::Exact(8), // not important
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(rows), // there are only nulls
distinct_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
},],
}
);
Ok(())
}
fn batch(row_size: usize) -> RecordBatch {
let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
}
fn schema() -> SchemaRef {
batch(1).schema()
}
fn memorysrcconfig_no_partitions(
sort_information: Vec<LexOrdering>,
) -> Result<MemorySourceConfig> {
let partitions = vec![];
MemorySourceConfig::try_new(&partitions, schema(), None)?
.try_with_sort_information(sort_information)
}
fn memorysrcconfig_1_partition_1_batch(
sort_information: Vec<LexOrdering>,
) -> Result<MemorySourceConfig> {
let partitions = vec![vec![batch(100)]];
MemorySourceConfig::try_new(&partitions, schema(), None)?
.try_with_sort_information(sort_information)
}
fn memorysrcconfig_3_partitions_1_batch_each(
sort_information: Vec<LexOrdering>,
) -> Result<MemorySourceConfig> {
let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
MemorySourceConfig::try_new(&partitions, schema(), None)?
.try_with_sort_information(sort_information)
}
fn memorysrcconfig_3_partitions_with_2_batches_each(
sort_information: Vec<LexOrdering>,
) -> Result<MemorySourceConfig> {
let partitions = vec![
vec![batch(100), batch(100)],
vec![batch(100), batch(100)],
vec![batch(100), batch(100)],
];
MemorySourceConfig::try_new(&partitions, schema(), None)?
.try_with_sort_information(sort_information)
}
/// Batches of different sizes, with batches ordered by size (100_000, 10_000, 100, 1)
/// in the Memtable partition (a.k.a. vector of batches).
fn memorysrcconfig_1_partition_with_different_sized_batches(
sort_information: Vec<LexOrdering>,
) -> Result<MemorySourceConfig> {
let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
MemorySourceConfig::try_new(&partitions, schema(), None)?
.try_with_sort_information(sort_information)
}
/// Same as [`memorysrcconfig_1_partition_with_different_sized_batches`],
/// but the batches are ordered differently (not by size)
/// in the Memtable partition (a.k.a. vector of batches).
fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
sort_information: Vec<LexOrdering>,
) -> Result<MemorySourceConfig> {
let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
MemorySourceConfig::try_new(&partitions, schema(), None)?
.try_with_sort_information(sort_information)
}
fn memorysrcconfig_2_partition_with_different_sized_batches(
sort_information: Vec<LexOrdering>,
) -> Result<MemorySourceConfig> {
let partitions = vec![
vec![batch(100_000), batch(10_000), batch(1_000)],
vec![batch(2_000), batch(20)],
];
MemorySourceConfig::try_new(&partitions, schema(), None)?
.try_with_sort_information(sort_information)
}
fn memorysrcconfig_2_partition_with_extreme_sized_batches(
sort_information: Vec<LexOrdering>,
) -> Result<MemorySourceConfig> {
let partitions = vec![
vec![
batch(100_000),
batch(1),
batch(1),
batch(1),
batch(1),
batch(0),
],
vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
];
MemorySourceConfig::try_new(&partitions, schema(), None)?
.try_with_sort_information(sort_information)
}
/// Assert that we get the expected count of partitions after repartitioning.
///
/// If None, then we expected the [`DataSource::repartitioned`] to return None.
fn assert_partitioning(
partitioned_datasrc: Option<Arc<dyn DataSource>>,
partition_cnt: Option<usize>,
) {
let should_exist = if let Some(partition_cnt) = partition_cnt {
format!("new datasource should exist and have {partition_cnt:?} partitions")
} else {
"new datasource should not exist".into()
};
let actual = partitioned_datasrc
.map(|datasrc| datasrc.output_partitioning().partition_count());
assert_eq!(
actual,
partition_cnt,
"partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
);
}
fn run_all_test_scenarios(
output_ordering: Option<LexOrdering>,
sort_information_on_config: Vec<LexOrdering>,
) -> Result<()> {
let not_used = usize::MAX;
// src has no partitions
let mem_src_config =
memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
let partitioned_datasrc =
mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
assert_partitioning(partitioned_datasrc, None);
// src has partitions == target partitions (=1)
let target_partitions = 1;
let mem_src_config =
memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
let partitioned_datasrc = mem_src_config.repartitioned(
target_partitions,
not_used,
output_ordering.clone(),
)?;
assert_partitioning(partitioned_datasrc, None);
// src has partitions == target partitions (=3)
let target_partitions = 3;
let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
sort_information_on_config.clone(),
)?;
let partitioned_datasrc = mem_src_config.repartitioned(
target_partitions,
not_used,
output_ordering.clone(),
)?;
assert_partitioning(partitioned_datasrc, None);
// src has partitions > target partitions, but we don't merge them
let target_partitions = 2;
let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
sort_information_on_config.clone(),
)?;
let partitioned_datasrc = mem_src_config.repartitioned(
target_partitions,
not_used,
output_ordering.clone(),
)?;
assert_partitioning(partitioned_datasrc, None);
// src has partitions < target partitions, but not enough batches (per partition) to split into more partitions
let target_partitions = 4;
let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
sort_information_on_config.clone(),
)?;
let partitioned_datasrc = mem_src_config.repartitioned(
target_partitions,
not_used,
output_ordering.clone(),
)?;
assert_partitioning(partitioned_datasrc, None);
// src has partitions < target partitions, and can split to sufficient amount
// has 6 batches across 3 partitions. Will need to split 2 of it's partitions.
let target_partitions = 5;
let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
sort_information_on_config.clone(),
)?;
let partitioned_datasrc = mem_src_config.repartitioned(
target_partitions,
not_used,
output_ordering.clone(),
)?;
assert_partitioning(partitioned_datasrc, Some(5));
// src has partitions < target partitions, and can split to sufficient amount
// has 6 batches across 3 partitions. Will need to split all of it's partitions.
let target_partitions = 6;
let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
sort_information_on_config.clone(),
)?;
let partitioned_datasrc = mem_src_config.repartitioned(
target_partitions,
not_used,
output_ordering.clone(),
)?;
assert_partitioning(partitioned_datasrc, Some(6));
// src has partitions < target partitions, but not enough total batches to fulfill the split (desired target_partitions)
let target_partitions = 3 * 2 + 1;
let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
sort_information_on_config.clone(),
)?;
let partitioned_datasrc = mem_src_config.repartitioned(
target_partitions,
not_used,
output_ordering.clone(),
)?;
assert_partitioning(partitioned_datasrc, None);
// src has 1 partition with many batches of lopsided sizes
// make sure it handles the split properly
let target_partitions = 2;
let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
sort_information_on_config,
)?;
let partitioned_datasrc = mem_src_config.clone().repartitioned(
target_partitions,
not_used,
output_ordering,
)?;
assert_partitioning(partitioned_datasrc.clone(), Some(2));
// Starting = batch(100_000), batch(10_000), batch(100), batch(1).
// It should have split as p1=batch(100_000), p2=[batch(10_000), batch(100), batch(1)]
let partitioned_datasrc = partitioned_datasrc.unwrap();
let Some(mem_src_config) = partitioned_datasrc
.as_any()
.downcast_ref::<MemorySourceConfig>()
else {
unreachable!()
};
let repartitioned_raw_batches = mem_src_config.partitions.clone();
assert_eq!(repartitioned_raw_batches.len(), 2);
let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
unreachable!()
};
// p1=batch(100_000)
assert_eq!(p1.len(), 1);
assert_eq!(p1[0].num_rows(), 100_000);
// p2=[batch(10_000), batch(100), batch(1)]
assert_eq!(p2.len(), 3);
assert_eq!(p2[0].num_rows(), 10_000);
assert_eq!(p2[1].num_rows(), 100);
assert_eq!(p2[2].num_rows(), 1);
Ok(())
}
#[test]
fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
let no_sort = vec![];
let no_output_ordering = None;
// Test: common set of functionality
run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
// Test: how no-sort-order divides differently.
// * does not preserve separate partitions (with own internal ordering) on even split,
// * nor does it preserve ordering (re-orders batch(2_000) vs batch(1_000)).
let target_partitions = 3;
let mem_src_config =
memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
let partitioned_datasrc = mem_src_config.clone().repartitioned(
target_partitions,
usize::MAX,
no_output_ordering,
)?;
assert_partitioning(partitioned_datasrc.clone(), Some(3));
// Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
// It should have split as p1=batch(100_000), p2=batch(10_000), p3=rest(mixed across original partitions)
let repartitioned_raw_batches = mem_src_config
.repartition_evenly_by_size(target_partitions)?
.unwrap();
assert_eq!(repartitioned_raw_batches.len(), 3);
let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
unreachable!()
};
// p1=batch(100_000)
assert_eq!(p1.len(), 1);
assert_eq!(p1[0].num_rows(), 100_000);
// p2=batch(10_000)
assert_eq!(p2.len(), 1);
assert_eq!(p2[0].num_rows(), 10_000);
// p3= batch(2_000), batch(1_000), batch(20)
assert_eq!(p3.len(), 3);
assert_eq!(p3[0].num_rows(), 2_000);
assert_eq!(p3[1].num_rows(), 1_000);
assert_eq!(p3[2].num_rows(), 20);
Ok(())
}
#[test]
fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches(
) -> Result<()> {
let no_sort = vec![];
let no_output_ordering = None;
// Test: case has two input partitions:
// b(100_000), b(1), b(1), b(1), b(1), b(0)
// b(1), b(1), b(1), b(1), b(0), b(100)
//
// We want an output with target_partitions=5, which means the ideal division is:
// b(100_000)
// b(100)
// b(1), b(1), b(1)
// b(1), b(1), b(1)
// b(1), b(1), b(0)
let target_partitions = 5;
let mem_src_config =
memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
let partitioned_datasrc = mem_src_config.clone().repartitioned(
target_partitions,
usize::MAX,
no_output_ordering,
)?;
assert_partitioning(partitioned_datasrc.clone(), Some(5));
// Starting partition 1 = batch(100_000), batch(1), batch(1), batch(1), batch(1), batch(0)
// Starting partition 1 = batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)
// It should have split as p1=batch(100_000), p2=batch(100), p3=[batch(1),batch(1)], p4=[batch(1),batch(1)], p5=[batch(1),batch(1),batch(0),batch(0)]
let repartitioned_raw_batches = mem_src_config
.repartition_evenly_by_size(target_partitions)?
.unwrap();
assert_eq!(repartitioned_raw_batches.len(), 5);
let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
else {
unreachable!()
};
// p1=batch(100_000)
assert_eq!(p1.len(), 1);
assert_eq!(p1[0].num_rows(), 100_000);
// p2=batch(100)
assert_eq!(p2.len(), 1);
assert_eq!(p2[0].num_rows(), 100);
// p3=[batch(1),batch(1),batch(1)]
assert_eq!(p3.len(), 3);
assert_eq!(p3[0].num_rows(), 1);
assert_eq!(p3[1].num_rows(), 1);
assert_eq!(p3[2].num_rows(), 1);
// p4=[batch(1),batch(1),batch(1)]
assert_eq!(p4.len(), 3);
assert_eq!(p4[0].num_rows(), 1);
assert_eq!(p4[1].num_rows(), 1);
assert_eq!(p4[2].num_rows(), 1);
// p5=[batch(1),batch(1),batch(0),batch(0)]
assert_eq!(p5.len(), 4);
assert_eq!(p5[0].num_rows(), 1);
assert_eq!(p5[1].num_rows(), 1);
assert_eq!(p5[2].num_rows(), 0);
assert_eq!(p5[3].num_rows(), 0);
Ok(())
}
#[test]
fn test_repartition_with_sort_information() -> Result<()> {
let schema = schema();
let sort_key: LexOrdering =
[PhysicalSortExpr::new_default(col("c", &schema)?)].into();
let has_sort = vec![sort_key.clone()];
let output_ordering = Some(sort_key);
// Test: common set of functionality
run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
// Test: DOES preserve separate partitions (with own internal ordering)
let target_partitions = 3;
let mem_src_config =
memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
let partitioned_datasrc = mem_src_config.clone().repartitioned(
target_partitions,
usize::MAX,
output_ordering.clone(),
)?;
assert_partitioning(partitioned_datasrc.clone(), Some(3));
// Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
// It should have split as p1=batch(100_000), p2=[batch(10_000),batch(1_000)], p3=<other_partition>
let Some(output_ord) = output_ordering else {
unreachable!()
};
let repartitioned_raw_batches = mem_src_config
.repartition_preserving_order(target_partitions, output_ord)?
.unwrap();
assert_eq!(repartitioned_raw_batches.len(), 3);
let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
unreachable!()
};
// p1=batch(100_000)
assert_eq!(p1.len(), 1);
assert_eq!(p1[0].num_rows(), 100_000);
// p2=[batch(10_000),batch(1_000)]
assert_eq!(p2.len(), 2);
assert_eq!(p2[0].num_rows(), 10_000);
assert_eq!(p2[1].num_rows(), 1_000);
// p3=batch(2_000), batch(20)
assert_eq!(p3.len(), 2);
assert_eq!(p3[0].num_rows(), 2_000);
assert_eq!(p3[1].num_rows(), 20);
Ok(())
}
#[test]
fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
let schema = schema();
let sort_key: LexOrdering =
[PhysicalSortExpr::new_default(col("c", &schema)?)].into();
let has_sort = vec![sort_key.clone()];
let output_ordering = Some(sort_key);
// src has 1 partition with many batches of lopsided sizes
// note that the input vector of batches are not ordered by decreasing size
let target_partitions = 2;
let mem_src_config =
memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
let partitioned_datasrc = mem_src_config.clone().repartitioned(
target_partitions,
usize::MAX,
output_ordering,
)?;
assert_partitioning(partitioned_datasrc.clone(), Some(2));
// Starting = batch(100_000), batch(1), batch(100), batch(10_000).
// It should have split as p1=batch(100_000), p2=[batch(1), batch(100), batch(10_000)]
let partitioned_datasrc = partitioned_datasrc.unwrap();
let Some(mem_src_config) = partitioned_datasrc
.as_any()
.downcast_ref::<MemorySourceConfig>()
else {
unreachable!()
};
let repartitioned_raw_batches = mem_src_config.partitions.clone();
assert_eq!(repartitioned_raw_batches.len(), 2);
let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
unreachable!()
};
// p1=batch(100_000)
assert_eq!(p1.len(), 1);
assert_eq!(p1[0].num_rows(), 100_000);
// p2=[batch(1), batch(100), batch(10_000)] -- **this is preserving the partition order**
assert_eq!(p2.len(), 3);
assert_eq!(p2[0].num_rows(), 1);
assert_eq!(p2[1].num_rows(), 100);
assert_eq!(p2[2].num_rows(), 10_000);
Ok(())
}
}