| <!--- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| # Extending DataFusion's operators: custom LogicalPlan and Execution Plans |
| |
| DataFusion supports extension of operators by transforming logical plan and execution plan through customized [optimizer rules](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html). This section will use the µWheel project to illustrate such capabilities. |
| |
| ## About DataFusion µWheel |
| |
| [DataFusion µWheel](https://github.com/uwheel/datafusion-uwheel/tree/main) is a native DataFusion optimizer which improves query performance for time-based analytics through fast temporal aggregation and pruning using custom indices. The integration of µWheel into DataFusion is a joint effort with the DataFusion community. |
| |
| ### Optimizing Logical Plan |
| |
| The `rewrite` function transforms logical plans by identifying temporal patterns and aggregation functions that match the stored wheel indices. When match is found, it queries the corresponding index to retrieve pre-computed aggregate values, stores these results in a [MemTable](https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html), and returns as a new `LogicalPlan::TableScan`. If no match is found, the original plan proceeds unchanged through DataFusion's standard execution path. |
| |
| ```rust,ignore |
| fn rewrite( |
| &self, |
| plan: LogicalPlan, |
| _config: &dyn OptimizerConfig, |
| ) -> Result<Transformed<LogicalPlan>> { |
| // Attempts to rewrite a logical plan to a uwheel-based plan that either provides |
| // plan-time aggregates or skips execution based on min/max pruning. |
| if let Some(rewritten) = self.try_rewrite(&plan) { |
| Ok(Transformed::yes(rewritten)) |
| } else { |
| Ok(Transformed::no(plan)) |
| } |
| } |
| ``` |
| |
| ```rust,ignore |
| // Converts a uwheel aggregate result to a TableScan with a MemTable as source |
| fn agg_to_table_scan(result: f64, schema: SchemaRef) -> Result<LogicalPlan> { |
| let data = Float64Array::from(vec![result]); |
| let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)])?; |
| let df_schema = Arc::new(DFSchema::try_from(schema.clone())?); |
| let mem_table = MemTable::try_new(schema, vec![vec![record_batch]])?; |
| mem_table_as_table_scan(mem_table, df_schema) |
| } |
| ``` |
| |
| To get a deeper dive into the usage of the µWheel project, visit the [blog post](https://uwheel.rs/post/datafusion_uwheel/) by Max Meldrum. |