| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! This example demonstrates how to use TrackConsumersPool for memory tracking and debugging. |
| //! |
| //! The TrackConsumersPool provides enhanced error messages that show the top memory consumers |
| //! when memory allocation fails, making it easier to debug memory issues in DataFusion queries. |
| //! |
| //! # Examples |
| //! |
| //! * [`automatic_usage_example`]: Shows how to use RuntimeEnvBuilder to automatically enable memory tracking |
| |
| use datafusion::execution::runtime_env::RuntimeEnvBuilder; |
| use datafusion::prelude::*; |
| |
| #[tokio::main] |
| async fn main() -> Result<(), Box<dyn std::error::Error>> { |
| println!("=== DataFusion Memory Pool Tracking Example ===\n"); |
| |
| // Example 1: Automatic Usage with RuntimeEnvBuilder |
| automatic_usage_example().await?; |
| |
| Ok(()) |
| } |
| |
| /// Example 1: Automatic Usage with RuntimeEnvBuilder |
| /// |
| /// This shows the recommended way to use TrackConsumersPool through RuntimeEnvBuilder, |
| /// which automatically creates a TrackConsumersPool with sensible defaults. |
| async fn automatic_usage_example() -> datafusion::error::Result<()> { |
| println!("Example 1: Automatic Usage with RuntimeEnvBuilder"); |
| println!("------------------------------------------------"); |
| |
| // Success case: Create a runtime with reasonable memory limit |
| println!("Success case: Normal operation with sufficient memory"); |
| let runtime = RuntimeEnvBuilder::new() |
| .with_memory_limit(5_000_000, 1.0) // 5MB, 100% utilization |
| .build_arc()?; |
| |
| let config = SessionConfig::new(); |
| let ctx = SessionContext::new_with_config_rt(config, runtime); |
| |
| // Create a simple table for demonstration |
| ctx.sql("CREATE TABLE test AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") |
| .await? |
| .collect() |
| .await?; |
| |
| println!("✓ Created table with memory tracking enabled"); |
| |
| // Run a simple query to show it works |
| let results = ctx.sql("SELECT * FROM test").await?.collect().await?; |
| println!( |
| "✓ Query executed successfully. Found {} rows", |
| results.len() |
| ); |
| |
| println!("\n{}", "-".repeat(50)); |
| |
| // Error case: Create a runtime with low memory limit to trigger errors |
| println!("Error case: Triggering memory limit error with detailed error messages"); |
| |
| // Use a WITH query that generates data and then processes it to trigger memory usage |
| match ctx.sql(" |
| WITH large_dataset AS ( |
| SELECT |
| column1 as id, |
| column1 * 2 as doubled, |
| repeat('data_', 20) || column1 as text_field, |
| column1 * column1 as squared |
| FROM generate_series(1, 2000) as t(column1) |
| ), |
| aggregated AS ( |
| SELECT |
| id, |
| doubled, |
| text_field, |
| squared, |
| sum(doubled) OVER (ORDER BY id ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) as running_sum |
| FROM large_dataset |
| ) |
| SELECT |
| a1.id, |
| a1.text_field, |
| a2.text_field as text_field2, |
| a1.running_sum + a2.running_sum as combined_sum |
| FROM aggregated a1 |
| JOIN aggregated a2 ON a1.id = a2.id - 1 |
| ORDER BY a1.id |
| ").await?.collect().await { |
| Ok(results) => panic!("Should not succeed! Yet got {} batches", results.len()), |
| Err(e) => { |
| println!("✓ Expected memory limit error during data processing:"); |
| println!("Error: {e}"); |
| /* Example error message: |
| Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes |
| caused by |
| Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: |
| ExternalSorterMerge[3]#112(can spill: false) consumed 10.0 MB, peak 10.0 MB, |
| ExternalSorterMerge[10]#147(can spill: false) consumed 10.0 MB, peak 10.0 MB, |
| ExternalSorter[1]#93(can spill: true) consumed 69.0 KB, peak 69.0 KB, |
| ExternalSorter[13]#155(can spill: true) consumed 67.6 KB, peak 67.6 KB, |
| ExternalSorter[8]#140(can spill: true) consumed 67.2 KB, peak 67.2 KB. |
| Error: Failed to allocate additional 10.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated for this reservation - 7.1 MB remain available for the total pool |
| */ |
| } |
| } |
| |
| println!("\nNote: The error message above shows which memory consumers"); |
| println!("were using the most memory when the limit was exceeded."); |
| |
| Ok(()) |
| } |