blob: dc3a7201ea239fd1d82e01bfcadfea8563662149 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::array::RecordBatch;
use arrow::pyarrow::ToPyArrow;
use datafusion::common::internal_datafusion_err;
use datafusion::common::internal_err;
use datafusion::common::tree_node::Transformed;
use datafusion::common::tree_node::TreeNode;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::error::DataFusionError;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::joins::NestedLoopJoinExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use datafusion::prelude::DataFrame;
use datafusion_python::errors::PyDataFusionError;
use datafusion_python::physical_plan::PyExecutionPlan;
use datafusion_python::sql::logical::PyLogicalPlan;
use datafusion_python::utils::wait_for_future;
use futures::stream::StreamExt;
use itertools::Itertools;
use log::trace;
use pyo3::exceptions::PyStopAsyncIteration;
use pyo3::exceptions::PyStopIteration;
use pyo3::prelude::*;
use std::borrow::Cow;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::isolator::PartitionIsolatorExec;
use crate::max_rows::MaxRowsExec;
use crate::pre_fetch::PrefetchExec;
use crate::stage::DFRayStageExec;
use crate::stage_reader::DFRayStageReaderExec;
use crate::util::ResultExt;
use crate::util::collect_from_stage;
use crate::util::display_plan_with_partition_counts;
use crate::util::physical_plan_to_bytes;
/// Internal rust class beyind the DFRayDataFrame python object
///
/// It is a container for a plan for a query, as we would expect.
///
/// This class plays two important roles. First, it defines the stages of the plan
/// by walking the plan provided to us in the constructor inside our dataframe.
/// That plan contains RayStageExec nodes, where are merely markers, that incidate to us where
/// to split the plan into descrete stages that can be hosted by a StageService.
///
/// The second role of this object is to be able to fetch record batches from the final_
/// stage in the plan and return them to python.
#[pyclass]
pub struct DFRayDataFrame {
/// holds the logical plan of the query we will execute
df: DataFrame,
/// the physical plan we will use to consume the final stage.
/// created when stages is run
final_plan: Option<Arc<dyn ExecutionPlan>>,
}
impl DFRayDataFrame {
pub fn new(df: DataFrame) -> Self {
Self {
df,
final_plan: None,
}
}
}
#[pymethods]
impl DFRayDataFrame {
#[pyo3(signature = (batch_size, prefetch_buffer_size, partitions_per_worker=None))]
fn stages(
&mut self,
py: Python,
batch_size: usize,
prefetch_buffer_size: usize,
partitions_per_worker: Option<usize>,
) -> PyResult<Vec<PyDFRayStage>> {
let mut stages = vec![];
let mut partition_groups = vec![];
let mut full_partitions = false;
// We walk up the tree from the leaves to find the stages, record ray stages, and replace
// each ray stage with a corresponding ray reader stage.
let up = |plan: Arc<dyn ExecutionPlan>| {
trace!(
"Examining plan up: {}",
displayable(plan.as_ref()).one_line()
);
if let Some(stage_exec) = plan.as_any().downcast_ref::<DFRayStageExec>() {
trace!("ray stage exec");
let input = plan.children();
assert!(input.len() == 1, "RayStageExec must have exactly one child");
let input = input[0];
let replacement = Arc::new(DFRayStageReaderExec::try_new(
plan.output_partitioning().clone(),
input.schema(),
stage_exec.stage_id,
)?) as Arc<dyn ExecutionPlan>;
let stage = PyDFRayStage::new(
stage_exec.stage_id,
input.clone(),
partition_groups.clone(),
full_partitions,
);
partition_groups = vec![];
full_partitions = false;
stages.push(stage);
Ok(Transformed::yes(replacement))
} else if plan.as_any().downcast_ref::<RepartitionExec>().is_some() {
trace!("repartition exec");
let (calculated_partition_groups, replacement) = build_replacement(
plan,
prefetch_buffer_size,
partitions_per_worker,
true,
batch_size,
batch_size,
)?;
partition_groups = calculated_partition_groups;
Ok(Transformed::yes(replacement))
} else if plan.as_any().downcast_ref::<SortExec>().is_some() {
trace!("sort exec");
let (calculated_partition_groups, replacement) = build_replacement(
plan,
prefetch_buffer_size,
partitions_per_worker,
false,
batch_size,
batch_size,
)?;
partition_groups = calculated_partition_groups;
full_partitions = true;
Ok(Transformed::yes(replacement))
} else if plan.as_any().downcast_ref::<NestedLoopJoinExec>().is_some() {
trace!("nested loop join exec");
// NestedLoopJoinExec must be on a stage by itself as it materializes the entire left
// side of the join and is not suitable to be executed in a partitioned manner.
let mut replacement = plan.clone();
let partition_count = plan.output_partitioning().partition_count();
trace!("nested join output partitioning {}", partition_count);
replacement = Arc::new(MaxRowsExec::new(
Arc::new(CoalesceBatchesExec::new(replacement, batch_size))
as Arc<dyn ExecutionPlan>,
batch_size,
)) as Arc<dyn ExecutionPlan>;
if prefetch_buffer_size > 0 {
replacement = Arc::new(PrefetchExec::new(replacement, prefetch_buffer_size))
as Arc<dyn ExecutionPlan>;
}
partition_groups = vec![(0..partition_count).collect()];
full_partitions = true;
Ok(Transformed::yes(replacement))
} else {
trace!("not special case");
Ok(Transformed::no(plan))
}
};
let physical_plan = wait_for_future(py, self.df.clone().create_physical_plan())?;
physical_plan.transform_up(up)?;
// add coalesce and max rows to last stage
let mut last_stage = stages
.pop()
.ok_or(internal_datafusion_err!("No stages found"))?;
if last_stage.num_output_partitions() > 1 {
return internal_err!("Last stage expected to have one partition").to_py_err();
}
last_stage = PyDFRayStage::new(
last_stage.stage_id,
Arc::new(MaxRowsExec::new(
Arc::new(CoalesceBatchesExec::new(last_stage.plan, batch_size))
as Arc<dyn ExecutionPlan>,
batch_size,
)) as Arc<dyn ExecutionPlan>,
vec![vec![0]],
true,
);
// done fixing last stage
let reader_plan = Arc::new(DFRayStageReaderExec::try_new_from_input(
last_stage.plan.clone(),
last_stage.stage_id,
)?) as Arc<dyn ExecutionPlan>;
stages.push(last_stage);
self.final_plan = Some(reader_plan);
Ok(stages)
}
fn execution_plan(&self, py: Python) -> PyResult<PyExecutionPlan> {
let plan = wait_for_future(py, self.df.clone().create_physical_plan())?;
Ok(PyExecutionPlan::new(plan))
}
fn display_execution_plan(&self, py: Python) -> PyResult<String> {
let plan = wait_for_future(py, self.df.clone().create_physical_plan())?;
Ok(display_plan_with_partition_counts(&plan).to_string())
}
fn logical_plan(&self) -> PyResult<PyLogicalPlan> {
Ok(PyLogicalPlan::new(self.df.logical_plan().clone()))
}
fn schema(&self, py: Python) -> PyResult<PyObject> {
self.df.schema().as_arrow().to_pyarrow(py)
}
fn optimized_logical_plan(&self) -> PyResult<PyLogicalPlan> {
Ok(PyLogicalPlan::new(self.df.clone().into_optimized_plan()?))
}
fn read_final_stage(
&mut self,
py: Python,
stage_id: usize,
stage_addr: &str,
) -> PyResult<PyRecordBatchStream> {
wait_for_future(
py,
collect_from_stage(
stage_id,
0,
stage_addr,
self.final_plan.take().unwrap().clone(),
),
)
.map(PyRecordBatchStream::new)
.to_py_err()
}
}
#[allow(clippy::type_complexity)]
fn build_replacement(
plan: Arc<dyn ExecutionPlan>,
prefetch_buffer_size: usize,
partitions_per_worker: Option<usize>,
isolate: bool,
max_rows: usize,
inner_batch_size: usize,
) -> Result<(Vec<Vec<usize>>, Arc<dyn ExecutionPlan>), DataFusionError> {
let mut replacement = plan.clone();
let children = plan.children();
assert!(children.len() == 1, "Unexpected plan structure");
let child = children[0];
let partition_count = child.output_partitioning().partition_count();
trace!(
"build_replacement for {}, partition_count: {}",
displayable(plan.as_ref()).one_line(),
partition_count
);
let partition_groups = match partitions_per_worker {
Some(p) => (0..partition_count)
.chunks(p)
.into_iter()
.map(|chunk| chunk.collect())
.collect(),
None => vec![(0..partition_count).collect()],
};
if isolate && partition_groups.len() > 1 {
let new_child = Arc::new(PartitionIsolatorExec::new(
child.clone(),
partitions_per_worker.unwrap(), // we know it is a Some, here.
));
replacement = replacement.clone().with_new_children(vec![new_child])?;
}
// insert a coalescing batches here too so that we aren't sending
// too small (or too big) of batches over the network
replacement = Arc::new(MaxRowsExec::new(
Arc::new(CoalesceBatchesExec::new(replacement, inner_batch_size)) as Arc<dyn ExecutionPlan>,
max_rows,
)) as Arc<dyn ExecutionPlan>;
if prefetch_buffer_size > 0 {
replacement = Arc::new(PrefetchExec::new(replacement, prefetch_buffer_size))
as Arc<dyn ExecutionPlan>;
}
Ok((partition_groups, replacement))
}
/// A Python class to hold a PHysical plan of a single stage
#[pyclass]
pub struct PyDFRayStage {
/// our stage id
stage_id: usize,
/// the physical plan of our stage
plan: Arc<dyn ExecutionPlan>,
/// the partition groups for this stage.
partition_groups: Vec<Vec<usize>>,
/// Are we hosting the complete partitions? If not
/// then RayStageReaderExecs will be inserted to consume its desired partition
/// from all stages with this same id, and merge the results. Using a
/// CombinedRecordBatchStream
full_partitions: bool,
}
impl PyDFRayStage {
fn new(
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
partition_groups: Vec<Vec<usize>>,
full_partitions: bool,
) -> Self {
Self {
stage_id,
plan,
partition_groups,
full_partitions,
}
}
}
#[pymethods]
impl PyDFRayStage {
#[getter]
fn stage_id(&self) -> usize {
self.stage_id
}
#[getter]
fn partition_groups(&self) -> Vec<Vec<usize>> {
self.partition_groups.clone()
}
#[getter]
fn full_partitions(&self) -> bool {
self.full_partitions
}
/// returns the number of output partitions of this stage
#[getter]
fn num_output_partitions(&self) -> usize {
self.plan.output_partitioning().partition_count()
}
/// returns the stage ids of that we need to read from in order to execute
#[getter]
pub fn child_stage_ids(&self) -> PyResult<Vec<usize>> {
let mut result = vec![];
self.plan
.clone()
.transform_down(|node: Arc<dyn ExecutionPlan>| {
if let Some(reader) = node.as_any().downcast_ref::<DFRayStageReaderExec>() {
result.push(reader.stage_id);
}
Ok(Transformed::no(node))
})?;
Ok(result)
}
pub fn execution_plan(&self) -> PyExecutionPlan {
PyExecutionPlan::new(self.plan.clone())
}
fn display_execution_plan(&self) -> PyResult<String> {
Ok(display_plan_with_partition_counts(&self.plan).to_string())
}
pub fn plan_bytes(&self) -> PyResult<Cow<[u8]>> {
let plan_bytes = physical_plan_to_bytes(self.plan.clone())?;
Ok(Cow::Owned(plan_bytes))
}
}
// PyRecordBatch and PyRecordBatchStream are borrowed, and slightly modified from datafusion-python
// they are not publicly exposed in that repo
#[pyclass]
pub struct PyRecordBatch {
pub batch: RecordBatch,
}
#[pymethods]
impl PyRecordBatch {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
self.batch.to_pyarrow(py)
}
}
impl From<RecordBatch> for PyRecordBatch {
fn from(batch: RecordBatch) -> Self {
Self { batch }
}
}
#[pyclass]
pub struct PyRecordBatchStream {
stream: Arc<Mutex<SendableRecordBatchStream>>,
}
impl PyRecordBatchStream {
pub fn new(stream: SendableRecordBatchStream) -> Self {
Self {
stream: Arc::new(Mutex::new(stream)),
}
}
}
#[pymethods]
impl PyRecordBatchStream {
fn next(&mut self, py: Python) -> PyResult<PyObject> {
let stream = self.stream.clone();
wait_for_future(py, next_stream(stream, true)).and_then(|b| b.to_pyarrow(py))
}
fn __next__(&mut self, py: Python) -> PyResult<PyObject> {
self.next(py)
}
fn __anext__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let stream = self.stream.clone();
pyo3_async_runtimes::tokio::future_into_py(py, next_stream(stream, false))
}
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
}
async fn next_stream(
stream: Arc<Mutex<SendableRecordBatchStream>>,
sync: bool,
) -> PyResult<PyRecordBatch> {
let mut stream = stream.lock().await;
match stream.next().await {
Some(Ok(batch)) => Ok(batch.into()),
Some(Err(e)) => Err(PyDataFusionError::from(e))?,
None => {
// Depending on whether the iteration is sync or not, we raise either a
// StopIteration or a StopAsyncIteration
if sync {
Err(PyStopIteration::new_err("stream exhausted"))
} else {
Err(PyStopAsyncIteration::new_err("stream exhausted"))
}
}
}
}