Reduce DataFrame stack size and fix large futures warnings (#10123)

diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs
index eea5fc1..75ca93a 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -156,7 +156,8 @@
 /// ```
 #[derive(Debug, Clone)]
 pub struct DataFrame {
-    session_state: SessionState,
+    // Box the (large) SessionState to reduce the size of DataFrame on the stack
+    session_state: Box<SessionState>,
     plan: LogicalPlan,
 }
 
@@ -168,7 +169,7 @@
     /// `DataFrame` from an existing datasource.
     pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self {
         Self {
-            session_state,
+            session_state: Box::new(session_state),
             plan,
         }
     }
@@ -230,7 +231,10 @@
         };
         let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
 
-        Ok(DataFrame::new(self.session_state, project_plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan: project_plan,
+        })
     }
 
     /// Expand each list element of a column to multiple rows.
@@ -269,7 +273,10 @@
         let plan = LogicalPlanBuilder::from(self.plan)
             .unnest_column_with_options(column, options)?
             .build()?;
-        Ok(DataFrame::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Return a DataFrame with only rows for which `predicate` evaluates to
@@ -294,7 +301,10 @@
         let plan = LogicalPlanBuilder::from(self.plan)
             .filter(predicate)?
             .build()?;
-        Ok(DataFrame::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Return a new `DataFrame` that aggregates the rows of the current
@@ -325,7 +335,10 @@
         let plan = LogicalPlanBuilder::from(self.plan)
             .aggregate(group_expr, aggr_expr)?
             .build()?;
-        Ok(DataFrame::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Return a new DataFrame that adds the result of evaluating one or more
@@ -334,7 +347,10 @@
         let plan = LogicalPlanBuilder::from(self.plan)
             .window(window_exprs)?
             .build()?;
-        Ok(DataFrame::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Returns a new `DataFrame` with a limited number of rows.
@@ -359,7 +375,10 @@
         let plan = LogicalPlanBuilder::from(self.plan)
             .limit(skip, fetch)?
             .build()?;
-        Ok(DataFrame::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows.
@@ -383,7 +402,10 @@
         let plan = LogicalPlanBuilder::from(self.plan)
             .union(dataframe.plan)?
             .build()?;
-        Ok(DataFrame::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Calculate the distinct union of two [`DataFrame`]s.
@@ -405,12 +427,13 @@
     /// # }
     /// ```
     pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
-        Ok(DataFrame::new(
-            self.session_state,
-            LogicalPlanBuilder::from(self.plan)
-                .union_distinct(dataframe.plan)?
-                .build()?,
-        ))
+        let plan = LogicalPlanBuilder::from(self.plan)
+            .union_distinct(dataframe.plan)?
+            .build()?;
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Return a new `DataFrame` with all duplicated rows removed.
@@ -428,10 +451,11 @@
     /// # }
     /// ```
     pub fn distinct(self) -> Result<DataFrame> {
-        Ok(DataFrame::new(
-            self.session_state,
-            LogicalPlanBuilder::from(self.plan).distinct()?.build()?,
-        ))
+        let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?;
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Return a new `DataFrame` that has statistics for a DataFrame.
@@ -599,15 +623,18 @@
             describe_record_batch.schema(),
             vec![vec![describe_record_batch]],
         )?;
-        Ok(DataFrame::new(
-            self.session_state,
-            LogicalPlanBuilder::scan(
-                UNNAMED_TABLE,
-                provider_as_source(Arc::new(provider)),
-                None,
-            )?
-            .build()?,
-        ))
+
+        let plan = LogicalPlanBuilder::scan(
+            UNNAMED_TABLE,
+            provider_as_source(Arc::new(provider)),
+            None,
+        )?
+        .build()?;
+
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Sort the DataFrame by the specified sorting expressions.
@@ -633,7 +660,10 @@
     /// ```
     pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame> {
         let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
-        Ok(DataFrame::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Join this `DataFrame` with another `DataFrame` using explicitly specified
@@ -687,7 +717,10 @@
                 filter,
             )?
             .build()?;
-        Ok(DataFrame::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Join this `DataFrame` with another `DataFrame` using the specified
@@ -737,7 +770,10 @@
         let plan = LogicalPlanBuilder::from(self.plan)
             .join_on(right.plan, join_type, expr)?
             .build()?;
-        Ok(DataFrame::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Repartition a DataFrame based on a logical partitioning scheme.
@@ -758,7 +794,10 @@
         let plan = LogicalPlanBuilder::from(self.plan)
             .repartition(partitioning_scheme)?
             .build()?;
-        Ok(DataFrame::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Return the total number of rows in this `DataFrame`.
@@ -863,7 +902,7 @@
 
     /// Return a new [`TaskContext`] which would be used to execute this DataFrame
     pub fn task_ctx(&self) -> TaskContext {
-        TaskContext::from(&self.session_state)
+        TaskContext::from(self.session_state.as_ref())
     }
 
     /// Executes this DataFrame and returns a stream over a single partition
@@ -969,7 +1008,7 @@
 
     /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
     pub fn into_parts(self) -> (SessionState, LogicalPlan) {
-        (self.session_state, self.plan)
+        (*self.session_state, self.plan)
     }
 
     /// Return the [`LogicalPlan`] represented by this DataFrame without running
@@ -1023,7 +1062,10 @@
         let plan = LogicalPlanBuilder::from(self.plan)
             .explain(verbose, analyze)?
             .build()?;
-        Ok(DataFrame::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Return a `FunctionRegistry` used to plan udf's calls
@@ -1042,7 +1084,7 @@
     /// # }
     /// ```
     pub fn registry(&self) -> &dyn FunctionRegistry {
-        &self.session_state
+        self.session_state.as_ref()
     }
 
     /// Calculate the intersection of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
@@ -1062,10 +1104,11 @@
     pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame> {
         let left_plan = self.plan;
         let right_plan = dataframe.plan;
-        Ok(DataFrame::new(
-            self.session_state,
-            LogicalPlanBuilder::intersect(left_plan, right_plan, true)?,
-        ))
+        let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?;
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Calculate the exception of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
@@ -1085,11 +1128,11 @@
     pub fn except(self, dataframe: DataFrame) -> Result<DataFrame> {
         let left_plan = self.plan;
         let right_plan = dataframe.plan;
-
-        Ok(DataFrame::new(
-            self.session_state,
-            LogicalPlanBuilder::except(left_plan, right_plan, true)?,
-        ))
+        let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?;
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Execute this `DataFrame` and write the results to `table_name`.
@@ -1114,7 +1157,13 @@
             write_options.overwrite,
         )?
         .build()?;
-        DataFrame::new(self.session_state, plan).collect().await
+
+        DataFrame {
+            session_state: self.session_state,
+            plan,
+        }
+        .collect()
+        .await
     }
 
     /// Execute the `DataFrame` and write the results to CSV file(s).
@@ -1162,7 +1211,13 @@
             options.partition_by,
         )?
         .build()?;
-        DataFrame::new(self.session_state, plan).collect().await
+
+        DataFrame {
+            session_state: self.session_state,
+            plan,
+        }
+        .collect()
+        .await
     }
 
     /// Execute the `DataFrame` and write the results to JSON file(s).
@@ -1211,7 +1266,13 @@
             options.partition_by,
         )?
         .build()?;
-        DataFrame::new(self.session_state, plan).collect().await
+
+        DataFrame {
+            session_state: self.session_state,
+            plan,
+        }
+        .collect()
+        .await
     }
 
     /// Add an additional column to the DataFrame.
@@ -1258,7 +1319,10 @@
 
         let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?;
 
-        Ok(DataFrame::new(self.session_state, project_plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan: project_plan,
+        })
     }
 
     /// Rename one column by applying a new projection. This is a no-op if the column to be
@@ -1322,7 +1386,10 @@
         let project_plan = LogicalPlanBuilder::from(self.plan)
             .project(projection)?
             .build()?;
-        Ok(DataFrame::new(self.session_state, project_plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan: project_plan,
+        })
     }
 
     /// Replace all parameters in logical plan with the specified
@@ -1384,7 +1451,10 @@
     /// ```
     pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
         let plan = self.plan.with_param_values(query_values)?;
-        Ok(Self::new(self.session_state, plan))
+        Ok(DataFrame {
+            session_state: self.session_state,
+            plan,
+        })
     }
 
     /// Cache DataFrame as a memory table.
@@ -1401,7 +1471,7 @@
     /// # }
     /// ```
     pub async fn cache(self) -> Result<DataFrame> {
-        let context = SessionContext::new_with_state(self.session_state.clone());
+        let context = SessionContext::new_with_state((*self.session_state).clone());
         // The schema is consistent with the output
         let plan = self.clone().create_physical_plan().await?;
         let schema = plan.schema();
diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs
index 7cc3201..0ec46df 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -68,7 +68,12 @@
             options.partition_by,
         )?
         .build()?;
-        DataFrame::new(self.session_state, plan).collect().await
+        DataFrame {
+            session_state: self.session_state,
+            plan,
+        }
+        .collect()
+        .await
     }
 }
 
diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs
index 31f3906..8966206 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -466,24 +466,37 @@
     /// [`SQLOptions::verify_plan`].
     pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result<DataFrame> {
         match plan {
-            LogicalPlan::Ddl(ddl) => match ddl {
-                DdlStatement::CreateExternalTable(cmd) => {
-                    self.create_external_table(&cmd).await
+            LogicalPlan::Ddl(ddl) => {
+                // Box::pin avoids allocating the stack space within this function's frame
+                // for every one of these individual async functions, decreasing the risk of
+                // stack overflows.
+                match ddl {
+                    DdlStatement::CreateExternalTable(cmd) => {
+                        Box::pin(async move { self.create_external_table(&cmd).await })
+                            as std::pin::Pin<Box<dyn futures::Future<Output = _> + Send>>
+                    }
+                    DdlStatement::CreateMemoryTable(cmd) => {
+                        Box::pin(self.create_memory_table(cmd))
+                    }
+                    DdlStatement::CreateView(cmd) => Box::pin(self.create_view(cmd)),
+                    DdlStatement::CreateCatalogSchema(cmd) => {
+                        Box::pin(self.create_catalog_schema(cmd))
+                    }
+                    DdlStatement::CreateCatalog(cmd) => {
+                        Box::pin(self.create_catalog(cmd))
+                    }
+                    DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)),
+                    DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)),
+                    DdlStatement::DropCatalogSchema(cmd) => {
+                        Box::pin(self.drop_schema(cmd))
+                    }
+                    DdlStatement::CreateFunction(cmd) => {
+                        Box::pin(self.create_function(cmd))
+                    }
+                    DdlStatement::DropFunction(cmd) => Box::pin(self.drop_function(cmd)),
                 }
-                DdlStatement::CreateMemoryTable(cmd) => {
-                    self.create_memory_table(cmd).await
-                }
-                DdlStatement::CreateView(cmd) => self.create_view(cmd).await,
-                DdlStatement::CreateCatalogSchema(cmd) => {
-                    self.create_catalog_schema(cmd).await
-                }
-                DdlStatement::CreateCatalog(cmd) => self.create_catalog(cmd).await,
-                DdlStatement::DropTable(cmd) => self.drop_table(cmd).await,
-                DdlStatement::DropView(cmd) => self.drop_view(cmd).await,
-                DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await,
-                DdlStatement::CreateFunction(cmd) => self.create_function(cmd).await,
-                DdlStatement::DropFunction(cmd) => self.drop_function(cmd).await,
-            },
+                .await
+            }
             // TODO what about the other statements (like TransactionStart and TransactionEnd)
             LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
                 self.set_variable(stmt).await