blob: 35cf766ba1afe6c4cb1759aee149a51224c33402 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use datafusion::error::Result;
use datafusion::physical_plan::displayable;
use datafusion::physical_planner::DefaultPhysicalPlanner;
use datafusion::prelude::*;
use datafusion_expr::{LogicalPlan, PlanType};
/// This example demonstrates the process of converting logical plan
/// into physical execution plans using DataFusion.
///
/// Planning phase in DataFusion contains several steps:
/// 1. Analyzing and optimizing logical plan
/// 2. Converting logical plan into physical plan
///
/// The code in this example shows two ways to convert a logical plan into
/// physical plan:
/// - Via the combined `create_physical_plan` API.
/// - Utilizing the analyzer, optimizer, and query planner APIs separately.
#[tokio::main]
async fn main() -> Result<()> {
// Set up a DataFusion context and load a Parquet file
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
let df = ctx
.read_parquet(
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;
// Construct the input logical plan using DataFrame API
let df = df
.clone()
.select(vec![
df.parse_sql_expr("int_col")?,
df.parse_sql_expr("double_col")?,
])?
.filter(df.parse_sql_expr("int_col < 5 OR double_col = 8.0")?)?
.aggregate(
vec![df.parse_sql_expr("double_col")?],
vec![df.parse_sql_expr("SUM(int_col) as sum_int_col")?],
)?
.limit(0, Some(1))?;
let logical_plan = df.logical_plan().clone();
to_physical_plan_in_one_api_demo(&logical_plan, &ctx).await?;
to_physical_plan_step_by_step_demo(logical_plan, &ctx).await?;
Ok(())
}
/// Converts a logical plan into a physical plan using the combined
/// `create_physical_plan` API. It will first optimize the logical
/// plan and then convert it into physical plan.
async fn to_physical_plan_in_one_api_demo(
input: &LogicalPlan,
ctx: &SessionContext,
) -> Result<()> {
let physical_plan = ctx.state().create_physical_plan(input).await?;
println!(
"Physical plan direct from logical plan:\n\n{}\n\n",
displayable(physical_plan.as_ref())
.to_stringified(false, PlanType::InitialPhysicalPlan)
.plan
);
Ok(())
}
/// Converts a logical plan into a physical plan by utilizing the analyzer,
/// optimizer, and query planner APIs separately. This flavor gives more
/// control over the planning process.
async fn to_physical_plan_step_by_step_demo(
input: LogicalPlan,
ctx: &SessionContext,
) -> Result<()> {
// First analyze the logical plan
let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
input,
ctx.state().config_options(),
|_, _| (),
)?;
println!("Analyzed logical plan:\n\n{:?}\n\n", analyzed_logical_plan);
// Optimize the analyzed logical plan
let optimized_logical_plan = ctx.state().optimizer().optimize(
analyzed_logical_plan,
&ctx.state(),
|_, _| (),
)?;
println!(
"Optimized logical plan:\n\n{:?}\n\n",
optimized_logical_plan
);
// Create the physical plan
let physical_plan = ctx
.state()
.query_planner()
.create_physical_plan(&optimized_logical_plan, &ctx.state())
.await?;
println!(
"Final physical plan:\n\n{}\n\n",
displayable(physical_plan.as_ref())
.to_stringified(false, PlanType::InitialPhysicalPlan)
.plan
);
// Call the physical optimizer with an existing physical plan (in this
// case the plan is already optimized, but an unoptimized plan would
// typically be used in this context)
// Note that this is not part of the trait but a public method
// on DefaultPhysicalPlanner. Not all planners will provide this feature.
let planner = DefaultPhysicalPlanner::default();
let physical_plan =
planner.optimize_physical_plan(physical_plan, &ctx.state(), |_, _| {})?;
println!(
"Optimized physical plan:\n\n{}\n\n",
displayable(physical_plan.as_ref())
.to_stringified(false, PlanType::InitialPhysicalPlan)
.plan
);
Ok(())
}