blob: 054d585e8e37599a4a7189c350d389868ddb1f95 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Traits for physical query plan, supporting parallel execution for partitioned relations.
use std::fmt::{Debug, Display};
use std::sync::Arc;
use std::{any::Any, pin::Pin};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::LogicalPlan;
use crate::{error::Result, scalar::ScalarValue};
use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
use async_trait::async_trait;
use futures::stream::Stream;
use self::merge::MergeExec;
use hashbrown::HashMap;
/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
/// Returns the schema of this `RecordBatchStream`.
///
/// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
/// stream should have the same schema as returned from this method.
fn schema(&self) -> SchemaRef;
}
/// Trait for a stream of record batches.
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + Sync>>;
/// SQL metric type
#[derive(Debug, Clone)]
pub enum MetricType {
/// Simple counter
Counter,
}
/// SQL metric such as counter (number of input or output rows) or timing information about
/// a physical operator.
#[derive(Debug, Clone)]
pub struct SQLMetric {
/// Metric name
name: String,
/// Metric value
value: usize,
/// Metric type
metric_type: MetricType,
}
impl SQLMetric {
/// Create a new SQLMetric
pub fn new(name: &str, metric_type: MetricType) -> Self {
Self {
name: name.to_owned(),
value: 0,
metric_type,
}
}
/// Add to the value
pub fn add(&mut self, n: usize) {
self.value += n;
}
/// Get the current value
pub fn value(&self) -> usize {
self.value
}
}
/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
pub trait PhysicalPlanner {
/// Create a physical plan from a logical plan
fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>>;
}
/// Partition-aware execution plan for a relation
#[async_trait]
pub trait ExecutionPlan: Debug + Send + Sync {
/// Returns the execution plan as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef;
/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;
/// Specifies the data distribution requirements of all the children for this operator
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}
/// Get a list of child execution plans that provide the input for this plan. The returned list
/// will be empty for leaf nodes, will contain a single value for unary nodes, or two
/// values for binary nodes (such as joins).
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>;
/// Returns a new plan where all children were replaced by new plans.
/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
fn with_new_children(
&self,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;
/// creates an iterator
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;
/// Return a snapshot of the metrics collected during execution
fn metrics(&self) -> HashMap<String, SQLMetric> {
HashMap::new()
}
}
/// Execute the [ExecutionPlan] and collect the results in memory
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
match plan.output_partitioning().partition_count() {
0 => Ok(vec![]),
1 => {
let it = plan.execute(0).await?;
common::collect(it).await
}
_ => {
// merge into a single partition
let plan = MergeExec::new(plan.clone());
// MergeExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
common::collect(plan.execute(0).await?).await
}
}
}
/// Execute the [ExecutionPlan] and collect the results in memory
pub async fn collect_partitioned(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<Vec<RecordBatch>>> {
match plan.output_partitioning().partition_count() {
0 => Ok(vec![]),
1 => {
let it = plan.execute(0).await?;
Ok(vec![common::collect(it).await?])
}
_ => {
let mut partitions = vec![];
for i in 0..plan.output_partitioning().partition_count() {
partitions.push(common::collect(plan.execute(i).await?).await?)
}
Ok(partitions)
}
}
}
/// Partitioning schemes supported by operators.
#[derive(Debug, Clone)]
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified number of partitions
RoundRobinBatch(usize),
/// Allocate rows based on a hash of one of more expressions and the specified
/// number of partitions
/// This partitioning scheme is not yet fully supported. See [ARROW-11011](https://issues.apache.org/jira/browse/ARROW-11011)
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
/// Unknown partitioning scheme with a known number of partitions
UnknownPartitioning(usize),
}
impl Partitioning {
/// Returns the number of partitions in this partitioning scheme
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
RoundRobinBatch(n) => *n,
Hash(_, n) => *n,
UnknownPartitioning(n) => *n,
}
}
}
/// Distribution schemes
#[derive(Debug, Clone, PartialEq)]
pub enum Distribution {
/// Unspecified distribution
UnspecifiedDistribution,
/// A single partition is required
SinglePartition,
}
/// Represents the result from an expression
#[derive(Clone)]
pub enum ColumnarValue {
/// Array of values
Array(ArrayRef),
/// A single value
Scalar(ScalarValue),
}
impl ColumnarValue {
fn data_type(&self) -> DataType {
match self {
ColumnarValue::Array(array_value) => array_value.data_type().clone(),
ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
}
}
fn into_array(self, num_rows: usize) -> ArrayRef {
match self {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
}
}
}
/// Expression that can be evaluated against a RecordBatch
/// A Physical expression knows its type, nullability and how to evaluate itself.
pub trait PhysicalExpr: Send + Sync + Display + Debug {
/// Returns the physical expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Get the data type of this expression, given the schema of the input
fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
/// Determine whether this expression is nullable, given the schema of the input
fn nullable(&self, input_schema: &Schema) -> Result<bool>;
/// Evaluate an expression against a RecordBatch
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
}
/// An aggregate expression that:
/// * knows its resulting field
/// * knows how to create its accumulator
/// * knows its accumulator's state's field
/// * knows the expressions from whose its accumulator will receive values
pub trait AggregateExpr: Send + Sync + Debug {
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// the field of the final result of this aggregation.
fn field(&self) -> Result<Field>;
/// the accumulator used to accumulate values from the expressions.
/// the accumulator expects the same number of arguments as `expressions` and must
/// return states with the same description as `state_fields`
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;
/// the fields that encapsulate the Accumulator's state
/// the number of fields here equals the number of states that the accumulator contains
fn state_fields(&self) -> Result<Vec<Field>>;
/// expressions that are passed to the Accumulator.
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
}
/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
/// generically accumulates values. An accumulator knows how to:
/// * update its state from inputs via `update`
/// * convert its internal state to a vector of scalar values
/// * update its state from multiple accumulators' states via `merge`
/// * compute the final value from its internal state via `evaluate`
pub trait Accumulator: Send + Sync + Debug {
/// Returns the state of the accumulator at the end of the accumulation.
// in the case of an average on which we track `sum` and `n`, this function should return a vector
// of two values, sum and n.
fn state(&self) -> Result<Vec<ScalarValue>>;
/// updates the accumulator's state from a vector of scalars.
fn update(&mut self, values: &[ScalarValue]) -> Result<()>;
/// updates the accumulator's state from a vector of arrays.
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
};
(0..values[0].len()).try_for_each(|index| {
let v = values
.iter()
.map(|array| ScalarValue::try_from_array(array, index))
.collect::<Result<Vec<_>>>()?;
self.update(&v)
})
}
/// updates the accumulator's state from a vector of scalars.
fn merge(&mut self, states: &[ScalarValue]) -> Result<()>;
/// updates the accumulator's state from a vector of states.
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
};
(0..states[0].len()).try_for_each(|index| {
let v = states
.iter()
.map(|array| ScalarValue::try_from_array(array, index))
.collect::<Result<Vec<_>>>()?;
self.merge(&v)
})
}
/// returns its value based on its current state.
fn evaluate(&self) -> Result<ScalarValue>;
}
pub mod aggregates;
pub mod array_expressions;
pub mod coalesce_batches;
pub mod common;
#[cfg(feature = "crypto_expressions")]
pub mod crypto_expressions;
pub mod csv;
pub mod datetime_expressions;
pub mod distinct_expressions;
pub mod empty;
pub mod explain;
pub mod expressions;
pub mod filter;
pub mod functions;
pub mod group_scalar;
pub mod hash_aggregate;
pub mod hash_join;
pub mod hash_utils;
pub mod limit;
pub mod math_expressions;
pub mod memory;
pub mod merge;
pub mod parquet;
pub mod planner;
pub mod projection;
#[cfg(feature = "regex_expressions")]
pub mod regex_expressions;
pub mod repartition;
pub mod sort;
pub mod string_expressions;
pub mod type_coercion;
pub mod udaf;
pub mod udf;
#[cfg(feature = "unicode_expressions")]
pub mod unicode_expressions;
pub mod union;