| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License.language governing permissions and |
| // limitations under the License. |
| |
| //! Serde code to convert Arrow schemas and DataFusion logical plans to Ballista protocol |
| //! buffer format, allowing DataFusion physical plans to be serialized and transmitted between |
| //! processes. |
| |
| use std::{ |
| convert::{TryFrom, TryInto}, |
| str::FromStr, |
| sync::Arc, |
| }; |
| |
| use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; |
| use datafusion::physical_plan::csv::CsvExec; |
| use datafusion::physical_plan::expressions::{ |
| CaseExpr, InListExpr, IsNotNullExpr, IsNullExpr, NegativeExpr, NotExpr, |
| }; |
| use datafusion::physical_plan::expressions::{CastExpr, TryCastExpr}; |
| use datafusion::physical_plan::filter::FilterExec; |
| use datafusion::physical_plan::hash_aggregate::AggregateMode; |
| use datafusion::physical_plan::hash_join::HashJoinExec; |
| use datafusion::physical_plan::hash_utils::JoinType; |
| use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; |
| use datafusion::physical_plan::parquet::ParquetExec; |
| use datafusion::physical_plan::projection::ProjectionExec; |
| use datafusion::physical_plan::sort::SortExec; |
| use datafusion::{ |
| physical_plan::expressions::{Count, Literal}, |
| scalar::ScalarValue, |
| }; |
| |
| use datafusion::physical_plan::{ |
| empty::EmptyExec, |
| expressions::{Avg, BinaryExpr, Column, Sum}, |
| Partitioning, |
| }; |
| use datafusion::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr}; |
| |
| use datafusion::physical_plan::hash_aggregate::HashAggregateExec; |
| use protobuf::physical_plan_node::PhysicalPlanType; |
| |
| use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec}; |
| use crate::serde::protobuf::repartition_exec_node::PartitionMethod; |
| use crate::serde::{protobuf, BallistaError}; |
| use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr}; |
| use datafusion::physical_plan::merge::MergeExec; |
| use datafusion::physical_plan::repartition::RepartitionExec; |
| |
| impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> { |
| type Error = BallistaError; |
| |
| fn try_into(self) -> Result<protobuf::PhysicalPlanNode, Self::Error> { |
| let plan = self.as_any(); |
| |
| if let Some(exec) = plan.downcast_ref::<ProjectionExec>() { |
| let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?; |
| let expr = exec |
| .expr() |
| .iter() |
| .map(|expr| expr.0.clone().try_into()) |
| .collect::<Result<Vec<_>, Self::Error>>()?; |
| let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect(); |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::Projection(Box::new( |
| protobuf::ProjectionExecNode { |
| input: Some(Box::new(input)), |
| expr, |
| expr_name, |
| }, |
| ))), |
| }) |
| } else if let Some(exec) = plan.downcast_ref::<FilterExec>() { |
| let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?; |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::Filter(Box::new( |
| protobuf::FilterExecNode { |
| input: Some(Box::new(input)), |
| expr: Some(exec.predicate().clone().try_into()?), |
| }, |
| ))), |
| }) |
| } else if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() { |
| let input: protobuf::PhysicalPlanNode = |
| limit.input().to_owned().try_into()?; |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new( |
| protobuf::GlobalLimitExecNode { |
| input: Some(Box::new(input)), |
| limit: limit.limit() as u32, |
| }, |
| ))), |
| }) |
| } else if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() { |
| let input: protobuf::PhysicalPlanNode = |
| limit.input().to_owned().try_into()?; |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new( |
| protobuf::LocalLimitExecNode { |
| input: Some(Box::new(input)), |
| limit: limit.limit() as u32, |
| }, |
| ))), |
| }) |
| } else if let Some(exec) = plan.downcast_ref::<HashJoinExec>() { |
| let left: protobuf::PhysicalPlanNode = exec.left().to_owned().try_into()?; |
| let right: protobuf::PhysicalPlanNode = exec.right().to_owned().try_into()?; |
| let on: Vec<protobuf::JoinOn> = exec |
| .on() |
| .iter() |
| .map(|tuple| protobuf::JoinOn { |
| left: tuple.0.to_owned(), |
| right: tuple.1.to_owned(), |
| }) |
| .collect(); |
| let join_type = match exec.join_type() { |
| JoinType::Inner => protobuf::JoinType::Inner, |
| JoinType::Left => protobuf::JoinType::Left, |
| JoinType::Right => protobuf::JoinType::Right, |
| }; |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new( |
| protobuf::HashJoinExecNode { |
| left: Some(Box::new(left)), |
| right: Some(Box::new(right)), |
| on, |
| join_type: join_type.into(), |
| }, |
| ))), |
| }) |
| } else if let Some(exec) = plan.downcast_ref::<HashAggregateExec>() { |
| let groups = exec |
| .group_expr() |
| .iter() |
| .map(|expr| expr.0.to_owned().try_into()) |
| .collect::<Result<Vec<_>, BallistaError>>()?; |
| let group_names = exec |
| .group_expr() |
| .iter() |
| .map(|expr| expr.1.to_owned()) |
| .collect(); |
| let agg = exec |
| .aggr_expr() |
| .iter() |
| .map(|expr| expr.to_owned().try_into()) |
| .collect::<Result<Vec<_>, BallistaError>>()?; |
| let agg_names = exec |
| .aggr_expr() |
| .iter() |
| .map(|expr| match expr.field() { |
| Ok(field) => Ok(field.name().clone()), |
| Err(e) => Err(BallistaError::DataFusionError(e)), |
| }) |
| .collect::<Result<_, Self::Error>>()?; |
| |
| let agg_mode = match exec.mode() { |
| AggregateMode::Partial => protobuf::AggregateMode::Partial, |
| AggregateMode::Final => protobuf::AggregateMode::Final, |
| }; |
| let input_schema = exec.input_schema(); |
| let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?; |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::HashAggregate(Box::new( |
| protobuf::HashAggregateExecNode { |
| group_expr: groups, |
| group_expr_name: group_names, |
| aggr_expr: agg, |
| aggr_expr_name: agg_names, |
| mode: agg_mode as i32, |
| input: Some(Box::new(input)), |
| input_schema: Some(input_schema.as_ref().into()), |
| }, |
| ))), |
| }) |
| } else if let Some(empty) = plan.downcast_ref::<EmptyExec>() { |
| let schema = empty.schema().as_ref().into(); |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::Empty( |
| protobuf::EmptyExecNode { |
| produce_one_row: empty.produce_one_row(), |
| schema: Some(schema), |
| }, |
| )), |
| }) |
| } else if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() |
| { |
| let input: protobuf::PhysicalPlanNode = |
| coalesce_batches.input().to_owned().try_into()?; |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new( |
| protobuf::CoalesceBatchesExecNode { |
| input: Some(Box::new(input)), |
| target_batch_size: coalesce_batches.target_batch_size() as u32, |
| }, |
| ))), |
| }) |
| } else if let Some(exec) = plan.downcast_ref::<CsvExec>() { |
| let delimiter = [*exec.delimiter().ok_or_else(|| { |
| BallistaError::General("Delimeter is not set for CsvExec".to_owned()) |
| })?]; |
| let delimiter = std::str::from_utf8(&delimiter).map_err(|_| { |
| BallistaError::General("Invalid CSV delimiter".to_owned()) |
| })?; |
| |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::CsvScan( |
| protobuf::CsvScanExecNode { |
| path: exec.path().to_owned(), |
| filename: exec.filenames().to_vec(), |
| projection: exec |
| .projection() |
| .ok_or_else(|| { |
| BallistaError::General( |
| "projection in CsvExec dosn not exist.".to_owned(), |
| ) |
| })? |
| .iter() |
| .map(|n| *n as u32) |
| .collect(), |
| file_extension: exec.file_extension().to_owned(), |
| schema: Some(exec.file_schema().as_ref().into()), |
| has_header: exec.has_header(), |
| delimiter: delimiter.to_string(), |
| batch_size: exec.batch_size() as u32, |
| }, |
| )), |
| }) |
| } else if let Some(exec) = plan.downcast_ref::<ParquetExec>() { |
| let filenames = exec |
| .partitions() |
| .iter() |
| .flat_map(|part| part.filenames().to_owned()) |
| .collect(); |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::ParquetScan( |
| protobuf::ParquetScanExecNode { |
| filename: filenames, |
| projection: exec |
| .projection() |
| .as_ref() |
| .iter() |
| .map(|n| *n as u32) |
| .collect(), |
| num_partitions: exec.partitions().len() as u32, |
| batch_size: exec.batch_size() as u32, |
| }, |
| )), |
| }) |
| } else if let Some(exec) = plan.downcast_ref::<ShuffleReaderExec>() { |
| let partition_location = exec |
| .partition_location |
| .iter() |
| .map(|l| l.clone().try_into()) |
| .collect::<Result<_, _>>()?; |
| |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::ShuffleReader( |
| protobuf::ShuffleReaderExecNode { |
| partition_location, |
| schema: Some(exec.schema().as_ref().into()), |
| }, |
| )), |
| }) |
| } else if let Some(exec) = plan.downcast_ref::<MergeExec>() { |
| let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?; |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::Merge(Box::new( |
| protobuf::MergeExecNode { |
| input: Some(Box::new(input)), |
| }, |
| ))), |
| }) |
| } else if let Some(exec) = plan.downcast_ref::<RepartitionExec>() { |
| let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?; |
| |
| let pb_partition_method = match exec.partitioning() { |
| Partitioning::Hash(exprs, partition_count) => { |
| PartitionMethod::Hash(protobuf::HashRepartition { |
| hash_expr: exprs |
| .iter() |
| .map(|expr| expr.clone().try_into()) |
| .collect::<Result<Vec<_>, BallistaError>>()?, |
| partition_count: *partition_count as u64, |
| }) |
| } |
| Partitioning::RoundRobinBatch(partition_count) => { |
| PartitionMethod::RoundRobin(*partition_count as u64) |
| } |
| Partitioning::UnknownPartitioning(partition_count) => { |
| PartitionMethod::Unknown(*partition_count as u64) |
| } |
| }; |
| |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new( |
| protobuf::RepartitionExecNode { |
| input: Some(Box::new(input)), |
| partition_method: Some(pb_partition_method), |
| }, |
| ))), |
| }) |
| } else if let Some(exec) = plan.downcast_ref::<SortExec>() { |
| let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?; |
| let expr = exec |
| .expr() |
| .iter() |
| .map(|expr| { |
| let sort_expr = Box::new(protobuf::SortExprNode { |
| expr: Some(Box::new(expr.expr.to_owned().try_into()?)), |
| asc: !expr.options.descending, |
| nulls_first: expr.options.nulls_first, |
| }); |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::Sort( |
| sort_expr, |
| )), |
| }) |
| }) |
| .collect::<Result<Vec<_>, Self::Error>>()?; |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::Sort(Box::new( |
| protobuf::SortExecNode { |
| input: Some(Box::new(input)), |
| expr, |
| }, |
| ))), |
| }) |
| } else if let Some(exec) = plan.downcast_ref::<UnresolvedShuffleExec>() { |
| Ok(protobuf::PhysicalPlanNode { |
| physical_plan_type: Some(PhysicalPlanType::Unresolved( |
| protobuf::UnresolvedShuffleExecNode { |
| query_stage_ids: exec |
| .query_stage_ids |
| .iter() |
| .map(|id| *id as u32) |
| .collect(), |
| schema: Some(exec.schema().as_ref().into()), |
| partition_count: exec.partition_count as u32, |
| }, |
| )), |
| }) |
| } else { |
| Err(BallistaError::General(format!( |
| "physical plan to_proto unsupported plan {:?}", |
| self |
| ))) |
| } |
| } |
| } |
| |
| impl TryInto<protobuf::LogicalExprNode> for Arc<dyn AggregateExpr> { |
| type Error = BallistaError; |
| |
| fn try_into(self) -> Result<protobuf::LogicalExprNode, Self::Error> { |
| let aggr_function = if self.as_any().downcast_ref::<Avg>().is_some() { |
| Ok(protobuf::AggregateFunction::Avg.into()) |
| } else if self.as_any().downcast_ref::<Sum>().is_some() { |
| Ok(protobuf::AggregateFunction::Sum.into()) |
| } else if self.as_any().downcast_ref::<Count>().is_some() { |
| Ok(protobuf::AggregateFunction::Count.into()) |
| } else { |
| Err(BallistaError::NotImplemented(format!( |
| "Aggregate function not supported: {:?}", |
| self |
| ))) |
| }?; |
| let expressions: Vec<protobuf::LogicalExprNode> = self |
| .expressions() |
| .iter() |
| .map(|e| e.clone().try_into()) |
| .collect::<Result<Vec<_>, BallistaError>>()?; |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::AggregateExpr( |
| Box::new(protobuf::AggregateExprNode { |
| aggr_function, |
| expr: Some(Box::new(expressions[0].clone())), |
| }), |
| )), |
| }) |
| } |
| } |
| |
| impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::LogicalExprNode { |
| type Error = BallistaError; |
| |
| fn try_from(value: Arc<dyn PhysicalExpr>) -> Result<Self, Self::Error> { |
| let expr = value.as_any(); |
| |
| if let Some(expr) = expr.downcast_ref::<Column>() { |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::ColumnName( |
| expr.name().to_owned(), |
| )), |
| }) |
| } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() { |
| let binary_expr = Box::new(protobuf::BinaryExprNode { |
| l: Some(Box::new(expr.left().to_owned().try_into()?)), |
| r: Some(Box::new(expr.right().to_owned().try_into()?)), |
| op: format!("{:?}", expr.op()), |
| }); |
| |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::BinaryExpr( |
| binary_expr, |
| )), |
| }) |
| } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() { |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::Case(Box::new( |
| protobuf::CaseNode { |
| expr: expr |
| .expr() |
| .as_ref() |
| .map(|exp| exp.clone().try_into().map(Box::new)) |
| .transpose()?, |
| when_then_expr: expr |
| .when_then_expr() |
| .iter() |
| .map(|(when_expr, then_expr)| { |
| try_parse_when_then_expr(when_expr, then_expr) |
| }) |
| .collect::<Result<Vec<protobuf::WhenThen>, Self::Error>>()?, |
| else_expr: expr |
| .else_expr() |
| .map(|a| a.clone().try_into().map(Box::new)) |
| .transpose()?, |
| }, |
| ))), |
| }) |
| } else if let Some(expr) = expr.downcast_ref::<NotExpr>() { |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::NotExpr( |
| Box::new(protobuf::Not { |
| expr: Some(Box::new(expr.arg().to_owned().try_into()?)), |
| }), |
| )), |
| }) |
| } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() { |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::IsNullExpr( |
| Box::new(protobuf::IsNull { |
| expr: Some(Box::new(expr.arg().to_owned().try_into()?)), |
| }), |
| )), |
| }) |
| } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() { |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::IsNotNullExpr( |
| Box::new(protobuf::IsNotNull { |
| expr: Some(Box::new(expr.arg().to_owned().try_into()?)), |
| }), |
| )), |
| }) |
| } else if let Some(expr) = expr.downcast_ref::<InListExpr>() { |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some( |
| protobuf::logical_expr_node::ExprType::InList( |
| Box::new( |
| protobuf::InListNode { |
| expr: Some(Box::new(expr.expr().to_owned().try_into()?)), |
| list: expr |
| .list() |
| .iter() |
| .map(|a| a.clone().try_into()) |
| .collect::<Result< |
| Vec<protobuf::LogicalExprNode>, |
| Self::Error, |
| >>()?, |
| negated: expr.negated(), |
| }, |
| ), |
| ), |
| ), |
| }) |
| } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() { |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::Negative( |
| Box::new(protobuf::NegativeNode { |
| expr: Some(Box::new(expr.arg().to_owned().try_into()?)), |
| }), |
| )), |
| }) |
| } else if let Some(lit) = expr.downcast_ref::<Literal>() { |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::Literal( |
| lit.value().try_into()?, |
| )), |
| }) |
| } else if let Some(cast) = expr.downcast_ref::<CastExpr>() { |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::Cast(Box::new( |
| protobuf::CastNode { |
| expr: Some(Box::new(cast.expr().clone().try_into()?)), |
| arrow_type: Some(cast.cast_type().into()), |
| }, |
| ))), |
| }) |
| } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() { |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::TryCast( |
| Box::new(protobuf::TryCastNode { |
| expr: Some(Box::new(cast.expr().clone().try_into()?)), |
| arrow_type: Some(cast.cast_type().into()), |
| }), |
| )), |
| }) |
| } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() { |
| let fun: BuiltinScalarFunction = |
| BuiltinScalarFunction::from_str(expr.name())?; |
| let fun: protobuf::ScalarFunction = (&fun).try_into()?; |
| let expr: Vec<protobuf::LogicalExprNode> = expr |
| .args() |
| .iter() |
| .map(|e| e.to_owned().try_into()) |
| .collect::<Result<Vec<_>, _>>()?; |
| Ok(protobuf::LogicalExprNode { |
| expr_type: Some(protobuf::logical_expr_node::ExprType::ScalarFunction( |
| protobuf::ScalarFunctionNode { |
| fun: fun.into(), |
| expr, |
| }, |
| )), |
| }) |
| } else { |
| Err(BallistaError::General(format!( |
| "physical_plan::to_proto() unsupported expression {:?}", |
| value |
| ))) |
| } |
| } |
| } |
| |
| fn try_parse_when_then_expr( |
| when_expr: &Arc<dyn PhysicalExpr>, |
| then_expr: &Arc<dyn PhysicalExpr>, |
| ) -> Result<protobuf::WhenThen, BallistaError> { |
| Ok(protobuf::WhenThen { |
| when_expr: Some(when_expr.clone().try_into()?), |
| then_expr: Some(then_expr.clone().try_into()?), |
| }) |
| } |