blob: 7220a972fd8929f6dd03736106a348acd63ad064 [file]
use std::{fmt::Formatter, sync::Arc};
use datafusion::{
common::internal_datafusion_err,
error::Result,
execution::SendableRecordBatchStream,
physical_plan::{
DisplayAs, DisplayFormatType, EmptyRecordBatchStream, ExecutionPlan, Partitioning,
PlanProperties,
},
};
use log::error;
pub struct PartitionGroup(pub Vec<usize>);
/// This is a simple execution plan that isolates a partition from the input plan
/// It will advertise that it has a single partition and when
/// asked to execute, it will execute a particular partition from the child
/// input plan.
///
/// This allows us to execute Repartition Exec's on different processes
/// by showing each one only a single child partition
#[derive(Debug)]
pub struct PartitionIsolatorExec {
pub input: Arc<dyn ExecutionPlan>,
properties: PlanProperties,
pub partition_count: usize,
}
impl PartitionIsolatorExec {
pub fn new(input: Arc<dyn ExecutionPlan>, partition_count: usize) -> Self {
// We advertise that we only have partition_count partitions
let properties = input
.properties()
.clone()
.with_partitioning(Partitioning::UnknownPartitioning(partition_count));
Self {
input,
properties,
partition_count,
}
}
}
impl DisplayAs for PartitionIsolatorExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"PartitionIsolatorExec [providing upto {} partitions]",
self.partition_count
)
}
}
impl ExecutionPlan for PartitionIsolatorExec {
fn name(&self) -> &str {
"PartitionIsolatorExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&std::sync::Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: std::sync::Arc<Self>,
children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
) -> Result<std::sync::Arc<dyn ExecutionPlan>> {
// TODO: generalize this
assert_eq!(children.len(), 1);
Ok(Arc::new(Self::new(
children[0].clone(),
self.partition_count,
)))
}
fn execute(
&self,
partition: usize,
context: std::sync::Arc<datafusion::execution::TaskContext>,
) -> Result<SendableRecordBatchStream> {
let config = context.session_config();
let partition_group = &config
.get_extension::<PartitionGroup>()
.ok_or(internal_datafusion_err!(
"PartitionGroup not set in session config"
))?
.0;
if partition > self.partition_count {
error!(
"PartitionIsolatorExec asked to execute partition {} but only has {} partitions",
partition, self.partition_count
);
return Err(internal_datafusion_err!(
"Invalid partition {} for PartitionIsolatorExec",
partition
));
}
let output_stream = match partition_group.get(partition) {
Some(actual_partition_number) => self.input.execute(*actual_partition_number, context),
None => Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
as SendableRecordBatchStream),
};
output_stream
}
}