blob: 47ea39fdc15553b95f10450103fe6ae10c548888 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use crate::context::PySessionContext;
use crate::errors::PyDataFusionResult;
use crate::expr::aggregate::PyAggregate;
use crate::expr::analyze::PyAnalyze;
use crate::expr::copy_to::PyCopyTo;
use crate::expr::create_catalog::PyCreateCatalog;
use crate::expr::create_catalog_schema::PyCreateCatalogSchema;
use crate::expr::create_external_table::PyCreateExternalTable;
use crate::expr::create_function::PyCreateFunction;
use crate::expr::create_index::PyCreateIndex;
use crate::expr::create_memory_table::PyCreateMemoryTable;
use crate::expr::create_view::PyCreateView;
use crate::expr::describe_table::PyDescribeTable;
use crate::expr::distinct::PyDistinct;
use crate::expr::dml::PyDmlStatement;
use crate::expr::drop_catalog_schema::PyDropCatalogSchema;
use crate::expr::drop_function::PyDropFunction;
use crate::expr::drop_table::PyDropTable;
use crate::expr::drop_view::PyDropView;
use crate::expr::empty_relation::PyEmptyRelation;
use crate::expr::explain::PyExplain;
use crate::expr::extension::PyExtension;
use crate::expr::filter::PyFilter;
use crate::expr::join::PyJoin;
use crate::expr::limit::PyLimit;
use crate::expr::projection::PyProjection;
use crate::expr::recursive_query::PyRecursiveQuery;
use crate::expr::repartition::PyRepartition;
use crate::expr::sort::PySort;
use crate::expr::statement::{
PyDeallocate, PyExecute, PyPrepare, PySetVariable, PyTransactionEnd, PyTransactionStart,
};
use crate::expr::subquery::PySubquery;
use crate::expr::subquery_alias::PySubqueryAlias;
use crate::expr::table_scan::PyTableScan;
use crate::expr::union::PyUnion;
use crate::expr::unnest::PyUnnest;
use crate::expr::values::PyValues;
use crate::expr::window::PyWindowExpr;
use datafusion::logical_expr::{DdlStatement, LogicalPlan, Statement};
use datafusion_proto::logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec};
use prost::Message;
use pyo3::{exceptions::PyRuntimeError, prelude::*, types::PyBytes};
use crate::expr::logical_node::LogicalNode;
#[pyclass(frozen, name = "LogicalPlan", module = "datafusion", subclass)]
#[derive(Debug, Clone)]
pub struct PyLogicalPlan {
pub(crate) plan: Arc<LogicalPlan>,
}
impl PyLogicalPlan {
/// creates a new PyLogicalPlan
pub fn new(plan: LogicalPlan) -> Self {
Self {
plan: Arc::new(plan),
}
}
pub fn plan(&self) -> Arc<LogicalPlan> {
self.plan.clone()
}
}
#[pymethods]
impl PyLogicalPlan {
/// Return the specific logical operator
pub fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
match self.plan.as_ref() {
LogicalPlan::Aggregate(plan) => PyAggregate::from(plan.clone()).to_variant(py),
LogicalPlan::Analyze(plan) => PyAnalyze::from(plan.clone()).to_variant(py),
LogicalPlan::Distinct(plan) => PyDistinct::from(plan.clone()).to_variant(py),
LogicalPlan::EmptyRelation(plan) => PyEmptyRelation::from(plan.clone()).to_variant(py),
LogicalPlan::Explain(plan) => PyExplain::from(plan.clone()).to_variant(py),
LogicalPlan::Extension(plan) => PyExtension::from(plan.clone()).to_variant(py),
LogicalPlan::Filter(plan) => PyFilter::from(plan.clone()).to_variant(py),
LogicalPlan::Join(plan) => PyJoin::from(plan.clone()).to_variant(py),
LogicalPlan::Limit(plan) => PyLimit::from(plan.clone()).to_variant(py),
LogicalPlan::Projection(plan) => PyProjection::from(plan.clone()).to_variant(py),
LogicalPlan::Sort(plan) => PySort::from(plan.clone()).to_variant(py),
LogicalPlan::TableScan(plan) => PyTableScan::from(plan.clone()).to_variant(py),
LogicalPlan::Subquery(plan) => PySubquery::from(plan.clone()).to_variant(py),
LogicalPlan::SubqueryAlias(plan) => PySubqueryAlias::from(plan.clone()).to_variant(py),
LogicalPlan::Unnest(plan) => PyUnnest::from(plan.clone()).to_variant(py),
LogicalPlan::Window(plan) => PyWindowExpr::from(plan.clone()).to_variant(py),
LogicalPlan::Repartition(plan) => PyRepartition::from(plan.clone()).to_variant(py),
LogicalPlan::Union(plan) => PyUnion::from(plan.clone()).to_variant(py),
LogicalPlan::Statement(plan) => match plan {
Statement::TransactionStart(plan) => {
PyTransactionStart::from(plan.clone()).to_variant(py)
}
Statement::TransactionEnd(plan) => {
PyTransactionEnd::from(plan.clone()).to_variant(py)
}
Statement::SetVariable(plan) => PySetVariable::from(plan.clone()).to_variant(py),
Statement::Prepare(plan) => PyPrepare::from(plan.clone()).to_variant(py),
Statement::Execute(plan) => PyExecute::from(plan.clone()).to_variant(py),
Statement::Deallocate(plan) => PyDeallocate::from(plan.clone()).to_variant(py),
},
LogicalPlan::Values(plan) => PyValues::from(plan.clone()).to_variant(py),
LogicalPlan::Dml(plan) => PyDmlStatement::from(plan.clone()).to_variant(py),
LogicalPlan::Ddl(plan) => match plan {
DdlStatement::CreateExternalTable(plan) => {
PyCreateExternalTable::from(plan.clone()).to_variant(py)
}
DdlStatement::CreateMemoryTable(plan) => {
PyCreateMemoryTable::from(plan.clone()).to_variant(py)
}
DdlStatement::CreateView(plan) => PyCreateView::from(plan.clone()).to_variant(py),
DdlStatement::CreateCatalogSchema(plan) => {
PyCreateCatalogSchema::from(plan.clone()).to_variant(py)
}
DdlStatement::CreateCatalog(plan) => {
PyCreateCatalog::from(plan.clone()).to_variant(py)
}
DdlStatement::CreateIndex(plan) => PyCreateIndex::from(plan.clone()).to_variant(py),
DdlStatement::DropTable(plan) => PyDropTable::from(plan.clone()).to_variant(py),
DdlStatement::DropView(plan) => PyDropView::from(plan.clone()).to_variant(py),
DdlStatement::DropCatalogSchema(plan) => {
PyDropCatalogSchema::from(plan.clone()).to_variant(py)
}
DdlStatement::CreateFunction(plan) => {
PyCreateFunction::from(plan.clone()).to_variant(py)
}
DdlStatement::DropFunction(plan) => {
PyDropFunction::from(plan.clone()).to_variant(py)
}
},
LogicalPlan::Copy(plan) => PyCopyTo::from(plan.clone()).to_variant(py),
LogicalPlan::DescribeTable(plan) => PyDescribeTable::from(plan.clone()).to_variant(py),
LogicalPlan::RecursiveQuery(plan) => {
PyRecursiveQuery::from(plan.clone()).to_variant(py)
}
}
}
/// Get the inputs to this plan
fn inputs(&self) -> Vec<PyLogicalPlan> {
let mut inputs = vec![];
for input in self.plan.inputs() {
inputs.push(input.to_owned().into());
}
inputs
}
fn __repr__(&self) -> PyResult<String> {
Ok(format!("{:?}", self.plan))
}
fn display(&self) -> String {
format!("{}", self.plan.display())
}
fn display_indent(&self) -> String {
format!("{}", self.plan.display_indent())
}
fn display_indent_schema(&self) -> String {
format!("{}", self.plan.display_indent_schema())
}
fn display_graphviz(&self) -> String {
format!("{}", self.plan.display_graphviz())
}
pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyDataFusionResult<Bound<'py, PyBytes>> {
let codec = DefaultLogicalExtensionCodec {};
let proto =
datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(&self.plan, &codec)?;
let bytes = proto.encode_to_vec();
Ok(PyBytes::new(py, &bytes))
}
#[staticmethod]
pub fn from_proto(
ctx: PySessionContext,
proto_msg: Bound<'_, PyBytes>,
) -> PyDataFusionResult<Self> {
let bytes: &[u8] = proto_msg.extract()?;
let proto_plan =
datafusion_proto::protobuf::LogicalPlanNode::decode(bytes).map_err(|e| {
PyRuntimeError::new_err(format!(
"Unable to decode logical node from serialized bytes: {e}"
))
})?;
let codec = DefaultLogicalExtensionCodec {};
let plan = proto_plan.try_into_logical_plan(&ctx.ctx, &codec)?;
Ok(Self::new(plan))
}
}
impl From<PyLogicalPlan> for LogicalPlan {
fn from(logical_plan: PyLogicalPlan) -> LogicalPlan {
logical_plan.plan.as_ref().clone()
}
}
impl From<LogicalPlan> for PyLogicalPlan {
fn from(logical_plan: LogicalPlan) -> PyLogicalPlan {
PyLogicalPlan {
plan: Arc::new(logical_plan),
}
}
}