ARROW-12170: [Rust][DataFusion] Introduce repartition optimization

This introduces a optimization pass to introduce repartition whenever the number of partitions of the plan drops below the configured amount of concurrency to optimize the amount of achievable concurrency.

This PR separates the optimizations into a `PhysicalOptimizer`, so this can be extended and built upon later.

The performance benefit is clear when loading data into memory with a single partition, to test the use case whenever we would have single files or in memory data has high enough throughput, but the single partition causes too little parallelism.
This has a similar performance benefit of pre-partitioning the data and loading it in memory in those queries.

```
cargo run --release --bin tpch --features "snmalloc" -- benchmark --iterations 30 --path [path --format parquet --query 1 --batch-size 8192 --concurrency 16 -m -n 1
```

Master

```
Query 1 avg time: 411.57 ms
Query 3 avg time: 147.32 ms
Query 5 avg time: 237.62 ms
Query 6 avg time: 46.00 ms
Query 12 avg time: 124.02 ms
```

PR
```
Query 1 avg time: 76.37 ms
Query 3 avg time: 67.51 ms
Query 5 avg time: 134.14 ms
Query 6 avg time: 9.58 ms
Query 12 avg time: 20.60 ms
```

All in all, looking good, we observe speed ups up to 6x for this test!

Closes #9865 from Dandandan/reparition-opt

Lead-authored-by: Heres, Daniel <danielheres@gmail.com>
Co-authored-by: Daniƫl Heres <danielheres@gmail.com>
Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
13 files changed
tree: 37f904e106c235a1b404545924452347bf34e068
  1. .github/
  2. c_glib/
  3. ci/
  4. cpp/
  5. csharp/
  6. dev/
  7. docs/
  8. format/
  9. go/
  10. java/
  11. js/
  12. julia/
  13. matlab/
  14. python/
  15. r/
  16. ruby/
  17. rust/
  18. .asf.yaml
  19. .clang-format
  20. .clang-tidy
  21. .clang-tidy-ignore
  22. .dir-locals.el
  23. .dockerignore
  24. .env
  25. .gitattributes
  26. .gitignore
  27. .gitmodules
  28. .hadolint.yaml
  29. .pre-commit-config.yaml
  30. .readthedocs.yml
  31. .travis.yml
  32. appveyor.yml
  33. CHANGELOG.md
  34. cmake-format.py
  35. CODE_OF_CONDUCT.md
  36. CONTRIBUTING.md
  37. docker-compose.yml
  38. header
  39. LICENSE.txt
  40. NOTICE.txt
  41. README.md
  42. run-cmake-format.py
README.md

Apache Arrow

Build Status Coverage Status Fuzzing Status License Twitter Follow

Powering In-Memory Analytics

Apache Arrow is a development platform for in-memory analytics. It contains a set of technologies that enable big data systems to process and move data fast.

Major components of the project include:

Arrow is an Apache Software Foundation project. Learn more at arrow.apache.org.

What's in the Arrow libraries?

The reference Arrow libraries contain many distinct software components:

  • Columnar vector and table-like containers (similar to data frames) supporting flat or nested types
  • Fast, language agnostic metadata messaging layer (using Google's Flatbuffers library)
  • Reference-counted off-heap buffer memory management, for zero-copy memory sharing and handling memory-mapped files
  • IO interfaces to local and remote filesystems
  • Self-describing binary wire formats (streaming and batch/file-like) for remote procedure calls (RPC) and interprocess communication (IPC)
  • Integration tests for verifying binary compatibility between the implementations (e.g. sending data from Java to C++)
  • Conversions to and from other in-memory data structures
  • Readers and writers for various widely-used file formats (such as Parquet, CSV)

Implementation status

The official Arrow libraries in this repository are in different stages of implementing the Arrow format and related features. See our current feature matrix on git master.

How to Contribute

Please read our latest project contribution guide.

Getting involved

Even if you do not plan to contribute to Apache Arrow itself or Arrow integrations in other projects, we'd be happy to have you involved: