Reduce DataFrame stack size and fix large futures warnings (#10123)
diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs
index eea5fc1..75ca93a 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -156,7 +156,8 @@
/// ```
#[derive(Debug, Clone)]
pub struct DataFrame {
- session_state: SessionState,
+ // Box the (large) SessionState to reduce the size of DataFrame on the stack
+ session_state: Box<SessionState>,
plan: LogicalPlan,
}
@@ -168,7 +169,7 @@
/// `DataFrame` from an existing datasource.
pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self {
Self {
- session_state,
+ session_state: Box::new(session_state),
plan,
}
}
@@ -230,7 +231,10 @@
};
let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
- Ok(DataFrame::new(self.session_state, project_plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan: project_plan,
+ })
}
/// Expand each list element of a column to multiple rows.
@@ -269,7 +273,10 @@
let plan = LogicalPlanBuilder::from(self.plan)
.unnest_column_with_options(column, options)?
.build()?;
- Ok(DataFrame::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Return a DataFrame with only rows for which `predicate` evaluates to
@@ -294,7 +301,10 @@
let plan = LogicalPlanBuilder::from(self.plan)
.filter(predicate)?
.build()?;
- Ok(DataFrame::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Return a new `DataFrame` that aggregates the rows of the current
@@ -325,7 +335,10 @@
let plan = LogicalPlanBuilder::from(self.plan)
.aggregate(group_expr, aggr_expr)?
.build()?;
- Ok(DataFrame::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Return a new DataFrame that adds the result of evaluating one or more
@@ -334,7 +347,10 @@
let plan = LogicalPlanBuilder::from(self.plan)
.window(window_exprs)?
.build()?;
- Ok(DataFrame::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Returns a new `DataFrame` with a limited number of rows.
@@ -359,7 +375,10 @@
let plan = LogicalPlanBuilder::from(self.plan)
.limit(skip, fetch)?
.build()?;
- Ok(DataFrame::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Calculate the union of two [`DataFrame`]s, preserving duplicate rows.
@@ -383,7 +402,10 @@
let plan = LogicalPlanBuilder::from(self.plan)
.union(dataframe.plan)?
.build()?;
- Ok(DataFrame::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Calculate the distinct union of two [`DataFrame`]s.
@@ -405,12 +427,13 @@
/// # }
/// ```
pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
- Ok(DataFrame::new(
- self.session_state,
- LogicalPlanBuilder::from(self.plan)
- .union_distinct(dataframe.plan)?
- .build()?,
- ))
+ let plan = LogicalPlanBuilder::from(self.plan)
+ .union_distinct(dataframe.plan)?
+ .build()?;
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Return a new `DataFrame` with all duplicated rows removed.
@@ -428,10 +451,11 @@
/// # }
/// ```
pub fn distinct(self) -> Result<DataFrame> {
- Ok(DataFrame::new(
- self.session_state,
- LogicalPlanBuilder::from(self.plan).distinct()?.build()?,
- ))
+ let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?;
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Return a new `DataFrame` that has statistics for a DataFrame.
@@ -599,15 +623,18 @@
describe_record_batch.schema(),
vec![vec![describe_record_batch]],
)?;
- Ok(DataFrame::new(
- self.session_state,
- LogicalPlanBuilder::scan(
- UNNAMED_TABLE,
- provider_as_source(Arc::new(provider)),
- None,
- )?
- .build()?,
- ))
+
+ let plan = LogicalPlanBuilder::scan(
+ UNNAMED_TABLE,
+ provider_as_source(Arc::new(provider)),
+ None,
+ )?
+ .build()?;
+
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Sort the DataFrame by the specified sorting expressions.
@@ -633,7 +660,10 @@
/// ```
pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
- Ok(DataFrame::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Join this `DataFrame` with another `DataFrame` using explicitly specified
@@ -687,7 +717,10 @@
filter,
)?
.build()?;
- Ok(DataFrame::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Join this `DataFrame` with another `DataFrame` using the specified
@@ -737,7 +770,10 @@
let plan = LogicalPlanBuilder::from(self.plan)
.join_on(right.plan, join_type, expr)?
.build()?;
- Ok(DataFrame::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Repartition a DataFrame based on a logical partitioning scheme.
@@ -758,7 +794,10 @@
let plan = LogicalPlanBuilder::from(self.plan)
.repartition(partitioning_scheme)?
.build()?;
- Ok(DataFrame::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Return the total number of rows in this `DataFrame`.
@@ -863,7 +902,7 @@
/// Return a new [`TaskContext`] which would be used to execute this DataFrame
pub fn task_ctx(&self) -> TaskContext {
- TaskContext::from(&self.session_state)
+ TaskContext::from(self.session_state.as_ref())
}
/// Executes this DataFrame and returns a stream over a single partition
@@ -969,7 +1008,7 @@
/// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
pub fn into_parts(self) -> (SessionState, LogicalPlan) {
- (self.session_state, self.plan)
+ (*self.session_state, self.plan)
}
/// Return the [`LogicalPlan`] represented by this DataFrame without running
@@ -1023,7 +1062,10 @@
let plan = LogicalPlanBuilder::from(self.plan)
.explain(verbose, analyze)?
.build()?;
- Ok(DataFrame::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Return a `FunctionRegistry` used to plan udf's calls
@@ -1042,7 +1084,7 @@
/// # }
/// ```
pub fn registry(&self) -> &dyn FunctionRegistry {
- &self.session_state
+ self.session_state.as_ref()
}
/// Calculate the intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
@@ -1062,10 +1104,11 @@
pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame> {
let left_plan = self.plan;
let right_plan = dataframe.plan;
- Ok(DataFrame::new(
- self.session_state,
- LogicalPlanBuilder::intersect(left_plan, right_plan, true)?,
- ))
+ let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?;
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Calculate the exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
@@ -1085,11 +1128,11 @@
pub fn except(self, dataframe: DataFrame) -> Result<DataFrame> {
let left_plan = self.plan;
let right_plan = dataframe.plan;
-
- Ok(DataFrame::new(
- self.session_state,
- LogicalPlanBuilder::except(left_plan, right_plan, true)?,
- ))
+ let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?;
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Execute this `DataFrame` and write the results to `table_name`.
@@ -1114,7 +1157,13 @@
write_options.overwrite,
)?
.build()?;
- DataFrame::new(self.session_state, plan).collect().await
+
+ DataFrame {
+ session_state: self.session_state,
+ plan,
+ }
+ .collect()
+ .await
}
/// Execute the `DataFrame` and write the results to CSV file(s).
@@ -1162,7 +1211,13 @@
options.partition_by,
)?
.build()?;
- DataFrame::new(self.session_state, plan).collect().await
+
+ DataFrame {
+ session_state: self.session_state,
+ plan,
+ }
+ .collect()
+ .await
}
/// Execute the `DataFrame` and write the results to JSON file(s).
@@ -1211,7 +1266,13 @@
options.partition_by,
)?
.build()?;
- DataFrame::new(self.session_state, plan).collect().await
+
+ DataFrame {
+ session_state: self.session_state,
+ plan,
+ }
+ .collect()
+ .await
}
/// Add an additional column to the DataFrame.
@@ -1258,7 +1319,10 @@
let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?;
- Ok(DataFrame::new(self.session_state, project_plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan: project_plan,
+ })
}
/// Rename one column by applying a new projection. This is a no-op if the column to be
@@ -1322,7 +1386,10 @@
let project_plan = LogicalPlanBuilder::from(self.plan)
.project(projection)?
.build()?;
- Ok(DataFrame::new(self.session_state, project_plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan: project_plan,
+ })
}
/// Replace all parameters in logical plan with the specified
@@ -1384,7 +1451,10 @@
/// ```
pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
let plan = self.plan.with_param_values(query_values)?;
- Ok(Self::new(self.session_state, plan))
+ Ok(DataFrame {
+ session_state: self.session_state,
+ plan,
+ })
}
/// Cache DataFrame as a memory table.
@@ -1401,7 +1471,7 @@
/// # }
/// ```
pub async fn cache(self) -> Result<DataFrame> {
- let context = SessionContext::new_with_state(self.session_state.clone());
+ let context = SessionContext::new_with_state((*self.session_state).clone());
// The schema is consistent with the output
let plan = self.clone().create_physical_plan().await?;
let schema = plan.schema();
diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs
index 7cc3201..0ec46df 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -68,7 +68,12 @@
options.partition_by,
)?
.build()?;
- DataFrame::new(self.session_state, plan).collect().await
+ DataFrame {
+ session_state: self.session_state,
+ plan,
+ }
+ .collect()
+ .await
}
}
diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs
index 31f3906..8966206 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -466,24 +466,37 @@
/// [`SQLOptions::verify_plan`].
pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result<DataFrame> {
match plan {
- LogicalPlan::Ddl(ddl) => match ddl {
- DdlStatement::CreateExternalTable(cmd) => {
- self.create_external_table(&cmd).await
+ LogicalPlan::Ddl(ddl) => {
+ // Box::pin avoids allocating the stack space within this function's frame
+ // for every one of these individual async functions, decreasing the risk of
+ // stack overflows.
+ match ddl {
+ DdlStatement::CreateExternalTable(cmd) => {
+ Box::pin(async move { self.create_external_table(&cmd).await })
+ as std::pin::Pin<Box<dyn futures::Future<Output = _> + Send>>
+ }
+ DdlStatement::CreateMemoryTable(cmd) => {
+ Box::pin(self.create_memory_table(cmd))
+ }
+ DdlStatement::CreateView(cmd) => Box::pin(self.create_view(cmd)),
+ DdlStatement::CreateCatalogSchema(cmd) => {
+ Box::pin(self.create_catalog_schema(cmd))
+ }
+ DdlStatement::CreateCatalog(cmd) => {
+ Box::pin(self.create_catalog(cmd))
+ }
+ DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)),
+ DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)),
+ DdlStatement::DropCatalogSchema(cmd) => {
+ Box::pin(self.drop_schema(cmd))
+ }
+ DdlStatement::CreateFunction(cmd) => {
+ Box::pin(self.create_function(cmd))
+ }
+ DdlStatement::DropFunction(cmd) => Box::pin(self.drop_function(cmd)),
}
- DdlStatement::CreateMemoryTable(cmd) => {
- self.create_memory_table(cmd).await
- }
- DdlStatement::CreateView(cmd) => self.create_view(cmd).await,
- DdlStatement::CreateCatalogSchema(cmd) => {
- self.create_catalog_schema(cmd).await
- }
- DdlStatement::CreateCatalog(cmd) => self.create_catalog(cmd).await,
- DdlStatement::DropTable(cmd) => self.drop_table(cmd).await,
- DdlStatement::DropView(cmd) => self.drop_view(cmd).await,
- DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await,
- DdlStatement::CreateFunction(cmd) => self.create_function(cmd).await,
- DdlStatement::DropFunction(cmd) => self.drop_function(cmd).await,
- },
+ .await
+ }
// TODO what about the other statements (like TransactionStart and TransactionEnd)
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
self.set_variable(stmt).await