update to datafusion-v42/arrow-v53/arrow-java-v16
diff --git a/Cargo.lock b/Cargo.lock
index d35ba60..573e319 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4,9 +4,9 @@
[[package]]
name = "addr2line"
-version = "0.24.1"
+version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375"
+checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
dependencies = [
"gimli",
]
@@ -78,9 +78,9 @@
[[package]]
name = "anyhow"
-version = "1.0.88"
+version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4e1496f8fb1fbf272686b8d37f523dab3e4a7443300055e74cdaa449f3114356"
+checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
[[package]]
name = "arrayref"
@@ -96,8 +96,8 @@
[[package]]
name = "arrow"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -116,8 +116,8 @@
[[package]]
name = "arrow-arith"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -130,24 +130,24 @@
[[package]]
name = "arrow-array"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"ahash",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"chrono",
- "chrono-tz 0.8.6",
+ "chrono-tz",
"half",
- "hashbrown",
+ "hashbrown 0.14.5",
"num",
]
[[package]]
name = "arrow-buffer"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"bytes",
"half",
@@ -156,26 +156,28 @@
[[package]]
name = "arrow-cast"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
- "base64 0.21.7",
+ "atoi",
+ "base64",
"chrono",
"comfy-table",
"half",
"lexical-core",
"num",
+ "ryu",
]
[[package]]
name = "arrow-csv"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -192,8 +194,8 @@
[[package]]
name = "arrow-data"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"arrow-buffer",
"arrow-schema",
@@ -203,8 +205,8 @@
[[package]]
name = "arrow-ipc"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -217,8 +219,8 @@
[[package]]
name = "arrow-json"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -236,8 +238,8 @@
[[package]]
name = "arrow-ord"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -250,8 +252,8 @@
[[package]]
name = "arrow-row"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"ahash",
"arrow-array",
@@ -259,13 +261,12 @@
"arrow-data",
"arrow-schema",
"half",
- "hashbrown",
]
[[package]]
name = "arrow-schema"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"bitflags 2.6.0",
"serde",
@@ -273,8 +274,8 @@
[[package]]
name = "arrow-select"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"ahash",
"arrow-array",
@@ -286,14 +287,15 @@
[[package]]
name = "arrow-string"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
+ "memchr",
"num",
"regex",
"regex-syntax",
@@ -301,9 +303,9 @@
[[package]]
name = "async-compression"
-version = "0.4.12"
+version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fec134f64e2bc57411226dfc4e52dec859ddfc7e711fc5e07b612584f000e4aa"
+checksum = "7e614738943d3f68c628ae3dbce7c3daffb196665f82f8c8ea6b65de73c79429"
dependencies = [
"bzip2",
"flate2",
@@ -325,14 +327,23 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
+]
+
+[[package]]
+name = "atoi"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528"
+dependencies = [
+ "num-traits",
]
[[package]]
name = "autocfg"
-version = "1.3.0"
+version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
+checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "backtrace"
@@ -351,12 +362,6 @@
[[package]]
name = "base64"
-version = "0.21.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
-
-[[package]]
-name = "base64"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
@@ -457,7 +462,7 @@
version = "0.1.0"
dependencies = [
"arrow",
- "base64 0.22.1",
+ "base64",
"datafusion",
"datafusion-ext-commons",
"datafusion-ext-exprs",
@@ -480,9 +485,9 @@
[[package]]
name = "brotli"
-version = "3.5.0"
+version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391"
+checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
@@ -491,9 +496,9 @@
[[package]]
name = "brotli-decompressor"
-version = "2.5.1"
+version = "4.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f"
+checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
@@ -546,9 +551,9 @@
[[package]]
name = "cc"
-version = "1.1.18"
+version = "1.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476"
+checksum = "2e80e3b6a3ab07840e1cae9b0666a63970dc28e8ed5ffbcdacbfc760c281bfc1"
dependencies = [
"jobserver",
"libc",
@@ -581,39 +586,17 @@
[[package]]
name = "chrono-tz"
-version = "0.8.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e"
-dependencies = [
- "chrono",
- "chrono-tz-build 0.2.1",
- "phf",
-]
-
-[[package]]
-name = "chrono-tz"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb"
dependencies = [
"chrono",
- "chrono-tz-build 0.3.0",
+ "chrono-tz-build",
"phf",
]
[[package]]
name = "chrono-tz-build"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f"
-dependencies = [
- "parse-zoneinfo",
- "phf",
- "phf_codegen",
-]
-
-[[package]]
-name = "chrono-tz-build"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1"
@@ -701,6 +684,12 @@
]
[[package]]
+name = "crossbeam-utils"
+version = "0.8.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
+
+[[package]]
name = "crunchy"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -739,12 +728,13 @@
[[package]]
name = "dashmap"
-version = "5.5.3"
+version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
+checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
- "hashbrown",
+ "crossbeam-utils",
+ "hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
@@ -752,8 +742,8 @@
[[package]]
name = "datafusion"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
dependencies = [
"ahash",
"arrow",
@@ -766,27 +756,34 @@
"bzip2",
"chrono",
"dashmap",
+ "datafusion-catalog",
"datafusion-common",
+ "datafusion-common-runtime",
"datafusion-execution",
"datafusion-expr",
"datafusion-functions",
- "datafusion-functions-array",
+ "datafusion-functions-aggregate",
+ "datafusion-functions-nested",
+ "datafusion-functions-window",
"datafusion-optimizer",
"datafusion-physical-expr",
+ "datafusion-physical-expr-common",
+ "datafusion-physical-optimizer",
"datafusion-physical-plan",
"datafusion-sql",
"flate2",
"futures",
"glob",
"half",
- "hashbrown",
+ "hashbrown 0.14.5",
"indexmap",
- "itertools 0.12.1",
+ "itertools 0.13.0",
"log",
"num_cpus",
"object_store",
"parking_lot",
"parquet",
+ "paste",
"pin-project-lite",
"rand",
"sqlparser",
@@ -800,9 +797,23 @@
]
[[package]]
+name = "datafusion-catalog"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "arrow-schema",
+ "async-trait",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-physical-plan",
+ "parking_lot",
+]
+
+[[package]]
name = "datafusion-common"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
dependencies = [
"ahash",
"arrow",
@@ -811,17 +822,30 @@
"arrow-schema",
"chrono",
"half",
+ "hashbrown 0.14.5",
+ "instant",
"libc",
"num_cpus",
"object_store",
"parquet",
+ "paste",
"sqlparser",
+ "tokio",
+]
+
+[[package]]
+name = "datafusion-common-runtime"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "log",
+ "tokio",
]
[[package]]
name = "datafusion-execution"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
dependencies = [
"arrow",
"chrono",
@@ -829,7 +853,7 @@
"datafusion-common",
"datafusion-expr",
"futures",
- "hashbrown",
+ "hashbrown 0.14.5",
"log",
"object_store",
"parking_lot",
@@ -840,20 +864,36 @@
[[package]]
name = "datafusion-expr"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
dependencies = [
"ahash",
"arrow",
"arrow-array",
+ "arrow-buffer",
+ "chrono",
"datafusion-common",
+ "datafusion-expr-common",
+ "datafusion-functions-aggregate-common",
+ "datafusion-physical-expr-common",
"paste",
+ "serde_json",
"sqlparser",
"strum",
"strum_macros",
]
[[package]]
+name = "datafusion-expr-common"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "arrow",
+ "datafusion-common",
+ "paste",
+]
+
+[[package]]
name = "datafusion-ext-commons"
version = "0.1.0"
dependencies = [
@@ -925,7 +965,7 @@
dependencies = [
"arrow",
"async-trait",
- "base64 0.22.1",
+ "base64",
"bitvec",
"blaze-jni-bridge",
"byteorder",
@@ -940,7 +980,7 @@
"futures",
"futures-util",
"gxhash",
- "hashbrown",
+ "hashbrown 0.14.5",
"itertools 0.13.0",
"jni",
"log",
@@ -952,6 +992,7 @@
"panic-message",
"parking_lot",
"paste",
+ "rand",
"slimmer_box",
"smallvec",
"tempfile",
@@ -963,76 +1004,23 @@
[[package]]
name = "datafusion-functions"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
dependencies = [
"arrow",
- "base64 0.21.7",
- "datafusion-common",
- "datafusion-execution",
- "datafusion-expr",
- "hex",
- "log",
-]
-
-[[package]]
-name = "datafusion-functions-array"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
-dependencies = [
- "arrow",
- "datafusion-common",
- "datafusion-execution",
- "datafusion-expr",
- "log",
- "paste",
-]
-
-[[package]]
-name = "datafusion-optimizer"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
-dependencies = [
- "arrow",
- "async-trait",
- "chrono",
- "datafusion-common",
- "datafusion-expr",
- "datafusion-physical-expr",
- "hashbrown",
- "itertools 0.12.1",
- "log",
- "regex-syntax",
-]
-
-[[package]]
-name = "datafusion-physical-expr"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
-dependencies = [
- "ahash",
- "arrow",
- "arrow-array",
"arrow-buffer",
- "arrow-ord",
- "arrow-schema",
- "arrow-string",
- "base64 0.21.7",
+ "base64",
"blake2",
"blake3",
"chrono",
"datafusion-common",
"datafusion-execution",
"datafusion-expr",
- "half",
- "hashbrown",
+ "hashbrown 0.14.5",
"hex",
- "indexmap",
- "itertools 0.12.1",
+ "itertools 0.13.0",
"log",
"md-5",
- "paste",
- "petgraph",
"rand",
"regex",
"sha2",
@@ -1041,46 +1029,195 @@
]
[[package]]
-name = "datafusion-physical-plan"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+name = "datafusion-functions-aggregate"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "ahash",
+ "arrow",
+ "arrow-schema",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-functions-aggregate-common",
+ "datafusion-physical-expr",
+ "datafusion-physical-expr-common",
+ "half",
+ "log",
+ "paste",
+ "sqlparser",
+]
+
+[[package]]
+name = "datafusion-functions-aggregate-common"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "ahash",
+ "arrow",
+ "datafusion-common",
+ "datafusion-expr-common",
+ "datafusion-physical-expr-common",
+ "rand",
+]
+
+[[package]]
+name = "datafusion-functions-nested"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "arrow",
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-ord",
+ "arrow-schema",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-functions",
+ "datafusion-functions-aggregate",
+ "datafusion-physical-expr-common",
+ "itertools 0.13.0",
+ "log",
+ "paste",
+ "rand",
+]
+
+[[package]]
+name = "datafusion-functions-window"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "datafusion-common",
+ "datafusion-expr",
+ "datafusion-physical-expr-common",
+ "log",
+]
+
+[[package]]
+name = "datafusion-optimizer"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "arrow",
+ "async-trait",
+ "chrono",
+ "datafusion-common",
+ "datafusion-expr",
+ "datafusion-physical-expr",
+ "hashbrown 0.14.5",
+ "indexmap",
+ "itertools 0.13.0",
+ "log",
+ "paste",
+ "regex-syntax",
+]
+
+[[package]]
+name = "datafusion-physical-expr"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
dependencies = [
"ahash",
"arrow",
"arrow-array",
"arrow-buffer",
+ "arrow-ord",
"arrow-schema",
- "async-trait",
+ "arrow-string",
+ "base64",
"chrono",
"datafusion-common",
"datafusion-execution",
"datafusion-expr",
+ "datafusion-expr-common",
+ "datafusion-functions-aggregate-common",
+ "datafusion-physical-expr-common",
+ "half",
+ "hashbrown 0.14.5",
+ "hex",
+ "indexmap",
+ "itertools 0.13.0",
+ "log",
+ "paste",
+ "petgraph",
+ "regex",
+]
+
+[[package]]
+name = "datafusion-physical-expr-common"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "ahash",
+ "arrow",
+ "datafusion-common",
+ "datafusion-expr-common",
+ "hashbrown 0.14.5",
+ "rand",
+]
+
+[[package]]
+name = "datafusion-physical-optimizer"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "arrow-schema",
+ "datafusion-common",
+ "datafusion-execution",
"datafusion-physical-expr",
+ "datafusion-physical-plan",
+ "itertools 0.13.0",
+]
+
+[[package]]
+name = "datafusion-physical-plan"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "ahash",
+ "arrow",
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-ord",
+ "arrow-schema",
+ "async-trait",
+ "chrono",
+ "datafusion-common",
+ "datafusion-common-runtime",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-functions-aggregate",
+ "datafusion-functions-aggregate-common",
+ "datafusion-physical-expr",
+ "datafusion-physical-expr-common",
"futures",
"half",
- "hashbrown",
+ "hashbrown 0.14.5",
"indexmap",
- "itertools 0.12.1",
+ "itertools 0.13.0",
"log",
"once_cell",
"parking_lot",
"pin-project-lite",
"rand",
"tokio",
- "uuid",
]
[[package]]
name = "datafusion-sql"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
dependencies = [
"arrow",
+ "arrow-array",
"arrow-schema",
"datafusion-common",
"datafusion-expr",
"log",
+ "regex",
"sqlparser",
+ "strum",
]
[[package]]
@@ -1106,12 +1243,6 @@
]
[[package]]
-name = "doc-comment"
-version = "0.3.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
-
-[[package]]
name = "either"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1164,9 +1295,9 @@
[[package]]
name = "flatbuffers"
-version = "23.5.26"
+version = "24.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640"
+checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f"
dependencies = [
"bitflags 1.3.2",
"rustc_version",
@@ -1174,9 +1305,9 @@
[[package]]
name = "flate2"
-version = "1.0.33"
+version = "1.0.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253"
+checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0"
dependencies = [
"crc32fast",
"miniz_oxide",
@@ -1253,7 +1384,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
@@ -1309,9 +1440,9 @@
[[package]]
name = "gimli"
-version = "0.31.0"
+version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64"
+checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "glob"
@@ -1350,10 +1481,10 @@
]
[[package]]
-name = "heck"
-version = "0.4.1"
+name = "hashbrown"
+version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
+checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
[[package]]
name = "heck"
@@ -1381,9 +1512,9 @@
[[package]]
name = "iana-time-zone"
-version = "0.1.60"
+version = "0.1.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141"
+checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220"
dependencies = [
"android_system_properties",
"core-foundation-sys",
@@ -1414,12 +1545,24 @@
[[package]]
name = "indexmap"
-version = "2.5.0"
+version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5"
+checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
dependencies = [
"equivalent",
- "hashbrown",
+ "hashbrown 0.15.0",
+]
+
+[[package]]
+name = "instant"
+version = "0.1.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
+dependencies = [
+ "cfg-if",
+ "js-sys",
+ "wasm-bindgen",
+ "web-sys",
]
[[package]]
@@ -1582,9 +1725,9 @@
[[package]]
name = "libc"
-version = "0.2.158"
+version = "0.2.159"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439"
+checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5"
[[package]]
name = "libm"
@@ -1761,28 +1904,28 @@
[[package]]
name = "object"
-version = "0.36.4"
+version = "0.36.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a"
+checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e"
dependencies = [
"memchr",
]
[[package]]
name = "object_store"
-version = "0.9.1"
+version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b8718f8b65fdf67a45108d1548347d4af7d71fb81ce727bbf9e3b2535e079db3"
+checksum = "25a0c4b3a0e31f8b66f71ad8064521efa773910196e2cde791436f13409f3b45"
dependencies = [
"async-trait",
"bytes",
"chrono",
"futures",
"humantime",
- "itertools 0.12.1",
+ "itertools 0.13.0",
"parking_lot",
"percent-encoding",
- "snafu 0.7.5",
+ "snafu",
"tokio",
"tracing",
"url",
@@ -1798,16 +1941,13 @@
[[package]]
name = "orc-rust"
version = "0.3.1"
-source = "git+https://github.com/harveyyue/datafusion-orc.git?rev=f0ff4bcffa762b62e8c57ed4c2f6e1a9547b4abb#f0ff4bcffa762b62e8c57ed4c2f6e1a9547b4abb"
+source = "git+https://github.com/blaze-init/datafusion-orc.git?rev=c54bfb5#c54bfb5bed3e74a51229be54373828e739dd53e6"
dependencies = [
"arrow",
"async-trait",
"bytes",
"chrono",
- "chrono-tz 0.9.0",
- "datafusion",
- "datafusion-expr",
- "datafusion-physical-expr",
+ "chrono-tz",
"fallible-streaming-iterator",
"flate2",
"futures",
@@ -1815,9 +1955,8 @@
"lz4_flex",
"lzokay-native",
"num",
- "object_store",
"prost 0.12.6",
- "snafu 0.8.4",
+ "snafu",
"snap",
"tokio",
"zstd 0.12.4",
@@ -1863,8 +2002,8 @@
[[package]]
name = "parquet"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
dependencies = [
"ahash",
"arrow-array",
@@ -1874,14 +2013,14 @@
"arrow-ipc",
"arrow-schema",
"arrow-select",
- "base64 0.21.7",
+ "base64",
"brotli",
"bytes",
"chrono",
"flate2",
"futures",
"half",
- "hashbrown",
+ "hashbrown 0.14.5",
"lz4_flex",
"num",
"num-bigint",
@@ -1893,6 +2032,7 @@
"tokio",
"twox-hash",
"zstd 0.13.2",
+ "zstd-sys",
]
[[package]]
@@ -1978,9 +2118,9 @@
[[package]]
name = "pkg-config"
-version = "0.3.30"
+version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
+checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2"
[[package]]
name = "ppv-lite86"
@@ -1998,14 +2138,14 @@
checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba"
dependencies = [
"proc-macro2",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
name = "proc-macro2"
-version = "1.0.86"
+version = "1.0.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
+checksum = "b3e4daa0dcf6feba26f985457cdf104d4b4256fc5a09547140f3631bb076b19a"
dependencies = [
"unicode-ident",
]
@@ -2032,12 +2172,12 @@
[[package]]
name = "prost-build"
-version = "0.13.2"
+version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f8650aabb6c35b860610e9cff5dc1af886c9e25073b7b1712a68972af4281302"
+checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
dependencies = [
"bytes",
- "heck 0.5.0",
+ "heck",
"itertools 0.13.0",
"log",
"multimap",
@@ -2047,7 +2187,7 @@
"prost 0.13.3",
"prost-types",
"regex",
- "syn 2.0.77",
+ "syn 2.0.79",
"tempfile",
]
@@ -2061,7 +2201,7 @@
"itertools 0.12.1",
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
@@ -2074,14 +2214,14 @@
"itertools 0.13.0",
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
name = "prost-types"
-version = "0.13.2"
+version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "60caa6738c7369b940c3d49246a8d1749323674c65cb13010134f5c9bad5b519"
+checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670"
dependencies = [
"prost 0.13.3",
]
@@ -2159,18 +2299,18 @@
[[package]]
name = "redox_syscall"
-version = "0.5.4"
+version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0884ad60e090bf1345b93da0a5de8923c93884cd03f40dfcfddd3b4bee661853"
+checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f"
dependencies = [
"bitflags 2.6.0",
]
[[package]]
name = "regex"
-version = "1.10.6"
+version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619"
+checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8"
dependencies = [
"aho-corasick",
"memchr",
@@ -2180,9 +2320,9 @@
[[package]]
name = "regex-automata"
-version = "0.4.7"
+version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df"
+checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3"
dependencies = [
"aho-corasick",
"memchr",
@@ -2191,9 +2331,9 @@
[[package]]
name = "regex-syntax"
-version = "0.8.4"
+version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
+checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "rustc-demangle"
@@ -2279,7 +2419,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
@@ -2311,9 +2451,9 @@
[[package]]
name = "simdutf8"
-version = "0.1.4"
+version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
+checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e"
[[package]]
name = "siphasher"
@@ -2347,45 +2487,23 @@
[[package]]
name = "snafu"
-version = "0.7.5"
+version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6"
+checksum = "223891c85e2a29c3fe8fb900c1fae5e69c2e42415e3177752e8718475efa5019"
dependencies = [
- "doc-comment",
- "snafu-derive 0.7.5",
-]
-
-[[package]]
-name = "snafu"
-version = "0.8.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2b835cb902660db3415a672d862905e791e54d306c6e8189168c7f3d9ae1c79d"
-dependencies = [
- "snafu-derive 0.8.4",
+ "snafu-derive",
]
[[package]]
name = "snafu-derive"
-version = "0.7.5"
+version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf"
+checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917"
dependencies = [
- "heck 0.4.1",
+ "heck",
"proc-macro2",
"quote",
- "syn 1.0.109",
-]
-
-[[package]]
-name = "snafu-derive"
-version = "0.8.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5"
-dependencies = [
- "heck 0.5.0",
- "proc-macro2",
- "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
@@ -2415,9 +2533,9 @@
[[package]]
name = "sqlparser"
-version = "0.43.1"
+version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f95c4bae5aba7cd30bd506f7140026ade63cff5afd778af8854026f9606bf5d4"
+checksum = "b2e5b515a2bd5168426033e9efbfd05500114833916f1d5c268f938b4ee130ac"
dependencies = [
"log",
"sqlparser_derive",
@@ -2431,7 +2549,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
@@ -2455,11 +2573,11 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
dependencies = [
- "heck 0.5.0",
+ "heck",
"proc-macro2",
"quote",
"rustversion",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
@@ -2481,9 +2599,9 @@
[[package]]
name = "syn"
-version = "2.0.77"
+version = "2.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed"
+checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590"
dependencies = [
"proc-macro2",
"quote",
@@ -2511,22 +2629,22 @@
[[package]]
name = "thiserror"
-version = "1.0.63"
+version = "1.0.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724"
+checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
-version = "1.0.63"
+version = "1.0.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
+checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
@@ -2595,7 +2713,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
@@ -2622,7 +2740,7 @@
"prost-build",
"prost-types",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
@@ -2644,7 +2762,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
@@ -2680,9 +2798,9 @@
[[package]]
name = "unicode-bidi"
-version = "0.3.15"
+version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75"
+checksum = "5ab17db44d7388991a428b2ee655ce0c212e862eff1768a455c58f9aad6e7893"
[[package]]
name = "unicode-ident"
@@ -2692,9 +2810,9 @@
[[package]]
name = "unicode-normalization"
-version = "0.1.23"
+version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5"
+checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956"
dependencies = [
"tinyvec",
]
@@ -2707,9 +2825,9 @@
[[package]]
name = "unicode-width"
-version = "0.1.13"
+version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d"
+checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af"
[[package]]
name = "url"
@@ -2775,7 +2893,7 @@
"once_cell",
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
"wasm-bindgen-shared",
]
@@ -2797,7 +2915,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -2809,6 +2927,16 @@
checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484"
[[package]]
+name = "web-sys"
+version = "0.3.70"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0"
+dependencies = [
+ "js-sys",
+ "wasm-bindgen",
+]
+
+[[package]]
name = "winapi-util"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2944,7 +3072,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.79",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 7228628..cb77161 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -51,42 +51,42 @@
datafusion-ext-functions = { path = "./native-engine/datafusion-ext-functions" }
datafusion-ext-plans = { path = "./native-engine/datafusion-ext-plans" }
-# datafusion: branch=v36-blaze
-datafusion = { version = "36.0.0" }
+# datafusion: branch=v42-blaze
+datafusion = { version = "42.0.0" }
orc-rust = { version = "0.3.1" }
-# arrow: branch=v50-blaze
-arrow = { version = "50.0.0", features = ["ffi"]}
-arrow-schema = { version = "50.0.0", features = ["serde"] }
-parquet = { version = "50.0.0" }
+# arrow: branch=v53-blaze
+arrow = { version = "53.0.0", features = ["ffi"]}
+arrow-schema = { version = "53.0.0", features = ["serde"] }
+parquet = { version = "53.0.0" }
# serde_json: branch=v1.0.96-blaze
serde_json = { version = "1.0.96" }
[patch.crates-io]
-# datafusion: branch=v36-blaze
-datafusion = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-datafusion-common = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-datafusion-expr = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-datafusion-execution = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-datafusion-optimizer = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-datafusion-physical-expr = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-orc-rust = { git = "https://github.com/harveyyue/datafusion-orc.git", rev = "f0ff4bcffa762b62e8c57ed4c2f6e1a9547b4abb"}
+# datafusion: branch=v42-blaze
+datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "c54bfb5"}
-# arrow: branch=v50-blaze
-arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
+# arrow: branch=v53-blaze
+arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
# serde_json: branch=v1.0.96-blaze
serde_json = { git = "https://github.com/blaze-init/json", branch = "v1.0.96-blaze" }
diff --git a/native-engine/blaze-serde/Cargo.toml b/native-engine/blaze-serde/Cargo.toml
index 3f97153..595fecc 100644
--- a/native-engine/blaze-serde/Cargo.toml
+++ b/native-engine/blaze-serde/Cargo.toml
@@ -15,7 +15,7 @@
datafusion-ext-functions = { workspace = true }
datafusion-ext-plans = { workspace = true }
log = "0.4.22"
-object_store = "0.9.0"
+object_store = "0.11.0"
prost = "0.13.3"
[build-dependencies]
diff --git a/native-engine/blaze-serde/build.rs b/native-engine/blaze-serde/build.rs
index a6d281b..44153a8 100644
--- a/native-engine/blaze-serde/build.rs
+++ b/native-engine/blaze-serde/build.rs
@@ -17,7 +17,6 @@
println!("cargo:rerun-if-env-changed=FORCE_REBUILD");
println!("cargo:rerun-if-changed=proto/blaze.proto");
- tonic_build::configure()
- .compile(&["proto/blaze.proto"], &["proto"])
+ tonic_build::compile_protos("proto/blaze.proto")
.map_err(|e| format!("protobuf compilation failed: {}", e))
}
diff --git a/native-engine/blaze-serde/proto/blaze.proto b/native-engine/blaze-serde/proto/blaze.proto
index cbf965d..a5387be 100644
--- a/native-engine/blaze-serde/proto/blaze.proto
+++ b/native-engine/blaze-serde/proto/blaze.proto
@@ -215,7 +215,7 @@
Sqrt=17;
Tan=18;
Trunc=19;
- Array=20;
+ NullIf=20;
RegexpMatch=21;
BitLength=22;
Btrim=23;
diff --git a/native-engine/blaze-serde/src/from_proto.rs b/native-engine/blaze-serde/src/from_proto.rs
index 6b5f7c9..7ab4677 100644
--- a/native-engine/blaze-serde/src/from_proto.rs
+++ b/native-engine/blaze-serde/src/from_proto.rs
@@ -33,11 +33,10 @@
physical_plan::FileScanConfig,
},
error::DataFusionError,
- execution::context::ExecutionProps,
- logical_expr::{BuiltinScalarFunction, ColumnarValue, Operator},
+ logical_expr::{ColumnarValue, Operator, ScalarUDF, Volatility},
physical_expr::{
expressions::{in_list, LikeExpr, SCAndExpr, SCOrExpr},
- functions, ScalarFunctionExpr,
+ ScalarFunctionExpr,
},
physical_plan::{
expressions as phys_expr,
@@ -48,6 +47,7 @@
union::UnionExec,
ColumnStatistics, ExecutionPlan, Partitioning, PhysicalExpr, Statistics,
},
+ prelude::create_udf,
};
use datafusion_ext_commons::downcast_any;
use datafusion_ext_exprs::{
@@ -116,7 +116,7 @@
let new_children = expr_in
.children()
.iter()
- .map(|child_expr| bind(child_expr.clone(), input_schema))
+ .map(|&child_expr| bind(child_expr.clone(), input_schema))
.collect::<Result<Vec<_>, DataFusionError>>()?;
Ok(expr_in.with_new_children(new_children)?)
}
@@ -804,74 +804,75 @@
}
}
-impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
- fn from(f: &protobuf::ScalarFunction) -> BuiltinScalarFunction {
+impl From<protobuf::ScalarFunction> for Arc<ScalarUDF> {
+ fn from(f: protobuf::ScalarFunction) -> Self {
+ use datafusion::functions as f;
use protobuf::ScalarFunction;
+
match f {
- ScalarFunction::Sqrt => Self::Sqrt,
- ScalarFunction::Sin => Self::Sin,
- ScalarFunction::Cos => Self::Cos,
- ScalarFunction::Tan => Self::Tan,
- ScalarFunction::Asin => Self::Asin,
- ScalarFunction::Acos => Self::Acos,
- ScalarFunction::Atan => Self::Atan,
- ScalarFunction::Exp => Self::Exp,
- ScalarFunction::Log => Self::Log,
- ScalarFunction::Ln => Self::Ln,
- ScalarFunction::Log10 => Self::Log10,
- ScalarFunction::Floor => Self::Floor,
- ScalarFunction::Ceil => Self::Ceil,
- ScalarFunction::Round => Self::Round,
- ScalarFunction::Trunc => Self::Trunc,
- ScalarFunction::Abs => Self::Abs,
- ScalarFunction::OctetLength => Self::OctetLength,
- ScalarFunction::Concat => Self::Concat,
- ScalarFunction::Lower => Self::Lower,
- ScalarFunction::Upper => Self::Upper,
- ScalarFunction::Trim => Self::Trim,
- ScalarFunction::Ltrim => Self::Ltrim,
- ScalarFunction::Rtrim => Self::Rtrim,
- ScalarFunction::ToTimestamp => Self::ToTimestamp,
- ScalarFunction::Array => Self::MakeArray,
- // ScalarFunction::NullIf => todo!(),
- ScalarFunction::DatePart => Self::DatePart,
- ScalarFunction::DateTrunc => Self::DateTrunc,
- ScalarFunction::Md5 => Self::MD5,
- ScalarFunction::Sha224 => Self::SHA224,
- ScalarFunction::Sha256 => Self::SHA256,
- ScalarFunction::Sha384 => Self::SHA384,
- ScalarFunction::Sha512 => Self::SHA512,
- ScalarFunction::Digest => Self::Digest,
- ScalarFunction::ToTimestampMillis => Self::ToTimestampMillis,
- ScalarFunction::Log2 => Self::Log2,
- ScalarFunction::Signum => Self::Signum,
- ScalarFunction::Ascii => Self::Ascii,
- ScalarFunction::BitLength => Self::BitLength,
- ScalarFunction::Btrim => Self::Btrim,
- ScalarFunction::CharacterLength => Self::CharacterLength,
- ScalarFunction::Chr => Self::Chr,
- ScalarFunction::ConcatWithSeparator => Self::ConcatWithSeparator,
- ScalarFunction::InitCap => Self::InitCap,
- ScalarFunction::Left => Self::Left,
- ScalarFunction::Lpad => Self::Lpad,
- ScalarFunction::Random => Self::Random,
- ScalarFunction::RegexpReplace => Self::RegexpReplace,
- ScalarFunction::Repeat => Self::Repeat,
- ScalarFunction::Replace => Self::Replace,
- ScalarFunction::Reverse => Self::Reverse,
- ScalarFunction::Right => Self::Right,
- ScalarFunction::Rpad => Self::Rpad,
- ScalarFunction::SplitPart => Self::SplitPart,
- ScalarFunction::StartsWith => Self::StartsWith,
- ScalarFunction::Strpos => Self::Strpos,
- ScalarFunction::Substr => Self::Substr,
- ScalarFunction::ToHex => Self::ToHex,
- ScalarFunction::ToTimestampMicros => Self::ToTimestampMicros,
- ScalarFunction::ToTimestampSeconds => Self::ToTimestampSeconds,
- ScalarFunction::Now => Self::Now,
- ScalarFunction::Translate => Self::Translate,
- ScalarFunction::RegexpMatch => Self::RegexpMatch,
- ScalarFunction::Coalesce => Self::Coalesce,
+ ScalarFunction::Sqrt => f::math::sqrt(),
+ ScalarFunction::Sin => f::math::sin(),
+ ScalarFunction::Cos => f::math::cos(),
+ ScalarFunction::Tan => f::math::tan(),
+ ScalarFunction::Asin => f::math::asin(),
+ ScalarFunction::Acos => f::math::acos(),
+ ScalarFunction::Atan => f::math::atan(),
+ ScalarFunction::Exp => f::math::exp(),
+ ScalarFunction::Log => f::math::log(),
+ ScalarFunction::Ln => f::math::ln(),
+ ScalarFunction::Log10 => f::math::log10(),
+ ScalarFunction::Floor => f::math::floor(),
+ ScalarFunction::Ceil => f::math::ceil(),
+ ScalarFunction::Round => f::math::round(),
+ ScalarFunction::Trunc => f::math::trunc(),
+ ScalarFunction::Abs => f::math::abs(),
+ ScalarFunction::OctetLength => f::string::octet_length(),
+ ScalarFunction::Concat => f::string::concat(),
+ ScalarFunction::Lower => f::string::lower(),
+ ScalarFunction::Upper => f::string::upper(),
+ ScalarFunction::Trim => f::string::btrim(),
+ ScalarFunction::Ltrim => f::string::ltrim(),
+ ScalarFunction::Rtrim => f::string::rtrim(),
+ ScalarFunction::ToTimestamp => f::datetime::to_timestamp(),
+ ScalarFunction::NullIf => f::core::nullif(),
+ ScalarFunction::DatePart => f::datetime::date_part(),
+ ScalarFunction::DateTrunc => f::datetime::date_trunc(),
+ ScalarFunction::Md5 => f::crypto::md5(),
+ ScalarFunction::Sha224 => f::crypto::sha224(),
+ ScalarFunction::Sha256 => f::crypto::sha256(),
+ ScalarFunction::Sha384 => f::crypto::sha384(),
+ ScalarFunction::Sha512 => f::crypto::sha512(),
+ ScalarFunction::Digest => f::crypto::digest(),
+ ScalarFunction::ToTimestampMillis => f::datetime::to_timestamp_millis(),
+ ScalarFunction::Log2 => f::math::log2(),
+ ScalarFunction::Signum => f::math::signum(),
+ ScalarFunction::Ascii => f::string::ascii(),
+ ScalarFunction::BitLength => f::string::bit_length(),
+ ScalarFunction::Btrim => f::string::btrim(),
+ ScalarFunction::CharacterLength => f::unicode::character_length(),
+ ScalarFunction::Chr => f::string::chr(),
+ ScalarFunction::ConcatWithSeparator => f::string::concat_ws(),
+ ScalarFunction::InitCap => f::string::initcap(),
+ ScalarFunction::Left => f::unicode::left(),
+ ScalarFunction::Lpad => f::unicode::lpad(),
+ ScalarFunction::Random => f::math::random(),
+ ScalarFunction::RegexpReplace => f::regex::regexp_replace(),
+ ScalarFunction::Repeat => f::string::repeat(),
+ ScalarFunction::Replace => f::string::replace(),
+ ScalarFunction::Reverse => f::unicode::reverse(),
+ ScalarFunction::Right => f::unicode::right(),
+ ScalarFunction::Rpad => f::unicode::rpad(),
+ ScalarFunction::SplitPart => f::string::split_part(),
+ ScalarFunction::StartsWith => f::string::starts_with(),
+ ScalarFunction::Strpos => f::unicode::strpos(),
+ ScalarFunction::Substr => f::unicode::substr(),
+ ScalarFunction::ToHex => f::string::to_hex(),
+ ScalarFunction::ToTimestampMicros => f::datetime::to_timestamp_micros(),
+ ScalarFunction::ToTimestampSeconds => f::datetime::to_timestamp_seconds(),
+ ScalarFunction::Now => f::datetime::now(),
+ ScalarFunction::Translate => f::unicode::translate(),
+ ScalarFunction::RegexpMatch => f::regex::regexp_match(),
+ ScalarFunction::Coalesce => f::core::coalesce(),
ScalarFunction::SparkExtFunctions => {
unreachable!()
}
@@ -998,20 +999,26 @@
.map(|x| try_parse_physical_expr(x, input_schema))
.collect::<Result<Vec<_>, _>>()?;
- let execution_props = ExecutionProps::new();
- let fun_expr = if scalar_function == protobuf::ScalarFunction::SparkExtFunctions {
- datafusion_ext_functions::create_spark_ext_function(&e.name)?
+ let scalar_udf = if scalar_function == protobuf::ScalarFunction::SparkExtFunctions {
+ let fun = datafusion_ext_functions::create_spark_ext_function(&e.name)?;
+ Arc::new(create_udf(
+ "spark_ext_function",
+ args.iter()
+ .map(|e| e.data_type(input_schema))
+ .collect::<Result<Vec<_>, _>>()?,
+ Arc::new(convert_required!(e.return_type)?),
+ Volatility::Volatile,
+ fun,
+ ))
} else {
- functions::create_physical_fun(&(&scalar_function).into(), &execution_props)?
+ let scalar_udf: Arc<ScalarUDF> = scalar_function.into();
+ scalar_udf
};
-
Arc::new(ScalarFunctionExpr::new(
- &e.name,
- fun_expr,
+ scalar_udf.name(),
+ scalar_udf.clone(),
args,
convert_required!(e.return_type)?,
- None,
- false,
))
}
ExprType::SparkUdfWrapperExpr(e) => Arc::new(SparkUDFWrapperExpr::try_new(
@@ -1153,6 +1160,7 @@
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
+ statistics: None,
extensions: None,
})
}
diff --git a/native-engine/blaze-serde/src/lib.rs b/native-engine/blaze-serde/src/lib.rs
index 0b0b8b3..223496b 100644
--- a/native-engine/blaze-serde/src/lib.rs
+++ b/native-engine/blaze-serde/src/lib.rs
@@ -498,7 +498,11 @@
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()?;
let scalar_type: DataType = pb_scalar_type.try_into()?;
- ScalarValue::List(ScalarValue::new_list(&typechecked_values, &scalar_type))
+ ScalarValue::List(ScalarValue::new_list(
+ &typechecked_values,
+ &scalar_type,
+ true,
+ ))
}
protobuf::scalar_value::Value::NullValue(v) => {
match v.datatype.as_ref().expect("missing scalar data type") {
@@ -633,6 +637,7 @@
Ok(ScalarValue::List(ScalarValue::new_list(
&values,
&element_scalar_type,
+ true,
)))
}
}
diff --git a/native-engine/blaze/src/metrics.rs b/native-engine/blaze/src/metrics.rs
index ecb47fb..51665b3 100644
--- a/native-engine/blaze/src/metrics.rs
+++ b/native-engine/blaze/src/metrics.rs
@@ -39,7 +39,7 @@
)?;
// update children nodes
- for (i, child_plan) in execution_plan.children().iter().enumerate() {
+ for (i, &child_plan) in execution_plan.children().iter().enumerate() {
let child_metric_node = jni_call!(
SparkMetricNode(metric_node).getChild(i as i32) -> JObject
)?;
diff --git a/native-engine/datafusion-ext-commons/src/io/batch_serde.rs b/native-engine/datafusion-ext-commons/src/io/batch_serde.rs
index d10ddb6..df46dfc 100644
--- a/native-engine/datafusion-ext-commons/src/io/batch_serde.rs
+++ b/native-engine/datafusion-ext-commons/src/io/batch_serde.rs
@@ -162,14 +162,13 @@
fn read_bits_buffer<R: Read>(input: &mut R, bits_len: usize) -> Result<Buffer> {
let buf = read_bytes_slice(input, (bits_len + 7) / 8)?;
- Ok(Buffer::from(buf))
+ Ok(Buffer::from_vec(buf.into()))
}
fn write_primitive_array<W: Write, PT: ArrowPrimitiveType>(
array: &PrimitiveArray<PT>,
output: &mut W,
) -> Result<()> {
- let _item_size = PT::get_byte_width();
let offset = array.offset();
let len = array.len();
let array_data = array.to_data();
@@ -510,7 +509,7 @@
let offsets_buffer: Buffer = offsets_buffer.into();
let data_len = cur_offset as usize;
- let data_buffer = Buffer::from(read_bytes_slice(input, data_len)?);
+ let data_buffer = Buffer::from_vec(read_bytes_slice(input, data_len)?.into());
let array_data = ArrayData::try_new(
data_type,
num_rows,
diff --git a/native-engine/datafusion-ext-commons/src/io/scalar_serde.rs b/native-engine/datafusion-ext-commons/src/io/scalar_serde.rs
index 344c785..c634644 100644
--- a/native-engine/datafusion-ext-commons/src/io/scalar_serde.rs
+++ b/native-engine/datafusion-ext-commons/src/io/scalar_serde.rs
@@ -90,8 +90,8 @@
write_array(col, output)?;
}
}
- ScalarValue::Map(value, _bool) => {
- write_scalar(value, nullable, output)?;
+ ScalarValue::Map(v) => {
+ write_array(v.as_ref(), output)?;
}
other => df_unimplemented_err!("unsupported scalarValue type: {other}")?,
}
@@ -186,9 +186,9 @@
.collect::<Result<Vec<_>>>()?;
ScalarValue::Struct(Arc::new(StructArray::new(fields.clone(), columns, None)))
}
- DataType::Map(field, bool) => {
- let map_value = read_scalar(input, field.data_type(), field.is_nullable())?;
- ScalarValue::Map(Box::new(map_value), *bool)
+ DataType::Map(field, _bool) => {
+ let map = read_array(input, field.data_type(), 1)?.as_map().clone();
+ ScalarValue::Map(Arc::new(map))
}
other => df_unimplemented_err!("unsupported data type: {other}")?,
})
diff --git a/native-engine/datafusion-ext-commons/src/spark_hash.rs b/native-engine/datafusion-ext-commons/src/spark_hash.rs
index b095a89..a02c43b 100644
--- a/native-engine/datafusion-ext-commons/src/spark_hash.rs
+++ b/native-engine/datafusion-ext-commons/src/spark_hash.rs
@@ -547,12 +547,14 @@
// Construct key and values
let key_data = ArrayData::builder(DataType::Int32)
.len(8)
- .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
+ .add_buffer(Buffer::from_slice_ref(
+ &[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice(),
+ ))
.build()
.unwrap();
let value_data = ArrayData::builder(DataType::UInt32)
.len(8)
- .add_buffer(Buffer::from(
+ .add_buffer(Buffer::from_slice_ref(
&[0u32, 10, 20, 0, 40, 0, 60, 70].to_byte_slice(),
))
.null_bit_buffer(Some(Buffer::from(&[0b11010110])))
@@ -561,7 +563,7 @@
// Construct a buffer for value offsets, for the nested array:
// [[0, 1, 2], [3, 4, 5], [6, 7]]
- let entry_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice());
+ let entry_offsets = Buffer::from_slice_ref(&[0, 3, 6, 8].to_byte_slice());
let keys_field = Arc::new(Field::new("keys", DataType::Int32, false));
let values_field = Arc::new(Field::new("values", DataType::UInt32, true));
diff --git a/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs b/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs
index fc03745..d1c5910 100644
--- a/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs
+++ b/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs
@@ -19,12 +19,14 @@
};
use arrow::{
- array::{make_array, new_empty_array, Array, ArrayRef, AsArray, Capacities, MutableArrayData},
+ array::{
+ make_array, new_empty_array, Array, ArrayRef, AsArray, Capacities, MutableArrayData,
+ RecordBatch, RecordBatchOptions,
+ },
datatypes::{
ArrowNativeType, BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, SchemaRef,
Utf8Type,
},
- record_batch::{RecordBatch, RecordBatchOptions},
};
use arrow_schema::DataType;
use datafusion::{
diff --git a/native-engine/datafusion-ext-exprs/src/bloom_filter_might_contain.rs b/native-engine/datafusion-ext-exprs/src/bloom_filter_might_contain.rs
index 32878a3..5b9bb49 100644
--- a/native-engine/datafusion-ext-exprs/src/bloom_filter_might_contain.rs
+++ b/native-engine/datafusion-ext-exprs/src/bloom_filter_might_contain.rs
@@ -123,8 +123,8 @@
Ok(ColumnarValue::Array(Arc::new(might_contain)))
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.bloom_filter_expr.clone(), self.value_expr.clone()]
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ vec![&self.bloom_filter_expr, &self.value_expr]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/cast.rs b/native-engine/datafusion-ext-exprs/src/cast.rs
index 78a1e0f..b3cd640 100644
--- a/native-engine/datafusion-ext-exprs/src/cast.rs
+++ b/native-engine/datafusion-ext-exprs/src/cast.rs
@@ -82,8 +82,8 @@
})
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ vec![&self.expr]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/get_indexed_field.rs b/native-engine/datafusion-ext-exprs/src/get_indexed_field.rs
index 6fd6620..0a94ec5 100644
--- a/native-engine/datafusion-ext-exprs/src/get_indexed_field.rs
+++ b/native-engine/datafusion-ext-exprs/src/get_indexed_field.rs
@@ -133,8 +133,8 @@
}
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.arg.clone()]
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ vec![&self.arg]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/get_map_value.rs b/native-engine/datafusion-ext-exprs/src/get_map_value.rs
index 387eed4..718fedb 100644
--- a/native-engine/datafusion-ext-exprs/src/get_map_value.rs
+++ b/native-engine/datafusion-ext-exprs/src/get_map_value.rs
@@ -21,6 +21,7 @@
use arrow::{
array::*,
+ compute::SortOptions,
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
@@ -95,7 +96,8 @@
let as_map_array = array.as_map();
let value_data = as_map_array.values().to_data();
let key = self.key.to_array()?;
- let comparator = build_compare(as_map_array.keys(), &key)?;
+ let comparator =
+ make_comparator(as_map_array.keys(), &key, SortOptions::default())?;
let mut mutable =
MutableArrayData::new(vec![&value_data], true, as_map_array.len());
@@ -125,8 +127,8 @@
}
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.arg.clone()]
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ vec![&self.arg]
}
fn with_new_children(
@@ -178,19 +180,21 @@
// Construct key and values
let key_data = ArrayData::builder(DataType::Int32)
.len(8)
- .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
+ .add_buffer(Buffer::from_slice_ref(
+ &[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice(),
+ ))
.build()
.unwrap();
let value_data = ArrayData::builder(DataType::UInt32)
.len(8)
- .add_buffer(Buffer::from(
+ .add_buffer(Buffer::from_slice_ref(
&[0u32, 10, 20, 0, 40, 0, 60, 70].to_byte_slice(),
))
- .null_bit_buffer(Some(Buffer::from(&[0b11010110])))
+ .null_bit_buffer(Some(Buffer::from_slice_ref(&[0b11010110])))
.build()
.unwrap();
- let entry_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice());
+ let entry_offsets = Buffer::from_slice_ref(&[0, 3, 6, 8].to_byte_slice());
let keys_field = Arc::new(Field::new("keys", DataType::Int32, false));
let values_field = Arc::new(Field::new("values", DataType::UInt32, true));
diff --git a/native-engine/datafusion-ext-exprs/src/named_struct.rs b/native-engine/datafusion-ext-exprs/src/named_struct.rs
index 0093869..a217731 100644
--- a/native-engine/datafusion-ext-exprs/src/named_struct.rs
+++ b/native-engine/datafusion-ext-exprs/src/named_struct.rs
@@ -105,8 +105,8 @@
Ok(ColumnarValue::Array(named_struct))
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- self.values.clone()
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ self.values.iter().collect()
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/row_num.rs b/native-engine/datafusion-ext-exprs/src/row_num.rs
index 4def27f..d281348 100644
--- a/native-engine/datafusion-ext-exprs/src/row_num.rs
+++ b/native-engine/datafusion-ext-exprs/src/row_num.rs
@@ -71,7 +71,7 @@
Ok(ColumnarValue::Array(Arc::new(array)))
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![]
}
diff --git a/native-engine/datafusion-ext-exprs/src/spark_scalar_subquery_wrapper.rs b/native-engine/datafusion-ext-exprs/src/spark_scalar_subquery_wrapper.rs
index 1da0e76..20dd9c6 100644
--- a/native-engine/datafusion-ext-exprs/src/spark_scalar_subquery_wrapper.rs
+++ b/native-engine/datafusion-ext-exprs/src/spark_scalar_subquery_wrapper.rs
@@ -113,7 +113,7 @@
result.cloned()
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![]
}
diff --git a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
index 688101d..98c5334 100644
--- a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
+++ b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
@@ -197,8 +197,8 @@
Ok(ColumnarValue::Array(imported_array))
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- self.params.clone()
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ self.params.iter().collect()
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/string_contains.rs b/native-engine/datafusion-ext-exprs/src/string_contains.rs
index 023863c..770708d 100644
--- a/native-engine/datafusion-ext-exprs/src/string_contains.rs
+++ b/native-engine/datafusion-ext-exprs/src/string_contains.rs
@@ -101,8 +101,8 @@
}
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ vec![&self.expr]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/string_ends_with.rs b/native-engine/datafusion-ext-exprs/src/string_ends_with.rs
index 514fd7e..34f0b42 100644
--- a/native-engine/datafusion-ext-exprs/src/string_ends_with.rs
+++ b/native-engine/datafusion-ext-exprs/src/string_ends_with.rs
@@ -100,8 +100,8 @@
}
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ vec![&self.expr]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/string_starts_with.rs b/native-engine/datafusion-ext-exprs/src/string_starts_with.rs
index 4bb8d96..672cafc 100644
--- a/native-engine/datafusion-ext-exprs/src/string_starts_with.rs
+++ b/native-engine/datafusion-ext-exprs/src/string_starts_with.rs
@@ -100,8 +100,8 @@
}
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ vec![&self.expr]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-functions/src/spark_dates.rs b/native-engine/datafusion-ext-functions/src/spark_dates.rs
index 7922339..265255b 100644
--- a/native-engine/datafusion-ext-functions/src/spark_dates.rs
+++ b/native-engine/datafusion-ext-functions/src/spark_dates.rs
@@ -12,22 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use arrow::compute::{day_dyn, month_dyn, year_dyn};
+use arrow::compute::{date_part, DatePart};
use datafusion::{common::Result, physical_plan::ColumnarValue};
pub fn spark_year(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let input = args[0].clone().into_array(1)?;
- Ok(ColumnarValue::Array(year_dyn(&input)?))
+ Ok(ColumnarValue::Array(date_part(&input, DatePart::Year)?))
}
pub fn spark_month(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let input = args[0].clone().into_array(1)?;
- Ok(ColumnarValue::Array(month_dyn(&input)?))
+ Ok(ColumnarValue::Array(date_part(&input, DatePart::Month)?))
}
pub fn spark_day(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let input = args[0].clone().into_array(1)?;
- Ok(ColumnarValue::Array(day_dyn(&input)?))
+ Ok(ColumnarValue::Array(date_part(&input, DatePart::Day)?))
}
#[cfg(test)]
diff --git a/native-engine/datafusion-ext-functions/src/spark_make_array.rs b/native-engine/datafusion-ext-functions/src/spark_make_array.rs
index 4d2159e..a86569c 100644
--- a/native-engine/datafusion-ext-functions/src/spark_make_array.rs
+++ b/native-engine/datafusion-ext-functions/src/spark_make_array.rs
@@ -118,6 +118,7 @@
output_scalars.push(ScalarValue::List(ScalarValue::new_list(
&row_scalars,
data_type,
+ true,
)));
}
ScalarValue::iter_to_array(output_scalars)?
diff --git a/native-engine/datafusion-ext-plans/Cargo.toml b/native-engine/datafusion-ext-plans/Cargo.toml
index 7b35284..f28da14 100644
--- a/native-engine/datafusion-ext-plans/Cargo.toml
+++ b/native-engine/datafusion-ext-plans/Cargo.toml
@@ -30,7 +30,7 @@
log = "0.4.22"
lz4_flex = "0.11.2"
num = "0.4.2"
-object_store = "0.9.0"
+object_store = "0.11.0"
once_cell = "1.20.2"
panic-message = "0.3.0"
parking_lot = "0.12.3"
@@ -44,3 +44,6 @@
zstd = "0.13.2"
orc-rust = { workspace = true }
futures-util = "0.3.31"
+
+[dev-dependencies]
+rand = "0.8.5"
diff --git a/native-engine/datafusion-ext-plans/src/agg/collect_list.rs b/native-engine/datafusion-ext-plans/src/agg/collect_list.rs
index c6cc796..7c1cba4 100644
--- a/native-engine/datafusion-ext-plans/src/agg/collect_list.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/collect_list.rs
@@ -203,6 +203,7 @@
.into_values(self.arg_type.clone(), false)
.collect::<Vec<_>>(),
&self.arg_type,
+ true,
)))
}
None => ScalarValue::try_from(&self.data_type),
diff --git a/native-engine/datafusion-ext-plans/src/agg/collect_set.rs b/native-engine/datafusion-ext-plans/src/agg/collect_set.rs
index a145ad4..497eb47 100644
--- a/native-engine/datafusion-ext-plans/src/agg/collect_set.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/collect_set.rs
@@ -200,6 +200,7 @@
Ok(ScalarValue::List(ScalarValue::new_list(
&set.into_iter().collect::<Vec<_>>(),
&self.arg_type,
+ true,
)))
}
None => ScalarValue::try_from(&self.data_type),
diff --git a/native-engine/datafusion-ext-plans/src/agg/mod.rs b/native-engine/datafusion-ext-plans/src/agg/mod.rs
index 36cdd22..d092cda 100644
--- a/native-engine/datafusion-ext-plans/src/agg/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/mod.rs
@@ -38,7 +38,7 @@
use arrow::{array::*, datatypes::*};
use datafusion::{
common::{Result, ScalarValue},
- logical_expr::aggregate_function,
+ // logical_expr::aggregate_function,
physical_expr::PhysicalExpr,
};
use datafusion_ext_commons::df_execution_err;
@@ -196,6 +196,8 @@
children: &[Arc<dyn PhysicalExpr>],
input_schema: &SchemaRef,
) -> Result<Arc<dyn Agg>> {
+ use datafusion::logical_expr::type_coercion::aggregates::*;
+
Ok(match agg_function {
AggFunction::Count => {
let return_type = DataType::Int64;
@@ -203,10 +205,12 @@
}
AggFunction::Sum => {
let arg_type = children[0].data_type(input_schema)?;
- let return_type = aggregate_function::AggregateFunction::return_type(
- &aggregate_function::AggregateFunction::Sum,
- &[arg_type],
- )?;
+ let return_type = match arg_type {
+ DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
+ DataType::Int64
+ }
+ other => sum_return_type(&other)?,
+ };
Arc::new(sum::AggSum::try_new(
Arc::new(TryCastExpr::new(children[0].clone(), return_type.clone())),
return_type,
@@ -214,10 +218,7 @@
}
AggFunction::Avg => {
let arg_type = children[0].data_type(input_schema)?;
- let return_type = aggregate_function::AggregateFunction::return_type(
- &aggregate_function::AggregateFunction::Avg,
- &[arg_type],
- )?;
+ let return_type = avg_return_type("avg", &arg_type)?;
Arc::new(avg::AggAvg::try_new(
Arc::new(TryCastExpr::new(children[0].clone(), return_type.clone())),
return_type,
diff --git a/native-engine/datafusion-ext-plans/src/agg_exec.rs b/native-engine/datafusion-ext-plans/src/agg_exec.rs
index 2b4c1e0..2e1d2dd 100644
--- a/native-engine/datafusion-ext-plans/src/agg_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/agg_exec.rs
@@ -26,17 +26,19 @@
use datafusion::{
common::{Result, Statistics},
execution::context::TaskContext,
- physical_expr::PhysicalSortExpr,
+ physical_expr::EquivalenceProperties,
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream,
},
};
use datafusion_ext_commons::{
batch_size, slim_bytes::SlimBytes, streams::coalesce_stream::CoalesceInput,
};
use futures::{stream::once, StreamExt, TryFutureExt, TryStreamExt};
+use once_cell::sync::OnceCell;
use crate::{
agg::{
@@ -58,6 +60,7 @@
input: Arc<dyn ExecutionPlan>,
agg_ctx: Arc<AggContext>,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl AggExec {
@@ -82,11 +85,16 @@
input,
agg_ctx,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
})
}
}
impl ExecutionPlan for AggExec {
+ fn name(&self) -> &str {
+ "AggExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -95,16 +103,18 @@
self.agg_ctx.output_schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
@@ -115,6 +125,7 @@
input: children[0].clone(),
agg_ctx: self.agg_ctx.clone(),
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
}))
}
diff --git a/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs b/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs
index bcfdcf2..aed0c01 100644
--- a/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs
@@ -22,14 +22,16 @@
use datafusion::{
common::Result,
execution::{SendableRecordBatchStream, TaskContext},
- physical_expr::{Partitioning, PhysicalExpr, PhysicalSortExpr},
+ physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr},
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties,
},
};
use futures::{stream::once, StreamExt, TryStreamExt};
+use once_cell::sync::OnceCell;
use crate::{
common::{output::TaskOutputter, timer_helper::TimerHelper},
@@ -40,6 +42,7 @@
input: Arc<dyn ExecutionPlan>,
keys: Vec<Arc<dyn PhysicalExpr>>,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl BroadcastJoinBuildHashMapExec {
@@ -48,6 +51,7 @@
input,
keys,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
}
}
}
@@ -65,6 +69,10 @@
}
impl ExecutionPlan for BroadcastJoinBuildHashMapExec {
+ fn name(&self) -> &str {
+ "BroadcastJoinBuildHashMapExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -73,16 +81,20 @@
join_hash_map_schema(&self.input.schema())
}
- fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(self.input.output_partitioning().partition_count())
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ Partitioning::UnknownPartitioning(
+ self.input.output_partitioning().partition_count(),
+ ),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs b/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs
index 3da0107..e27d7f7 100644
--- a/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs
@@ -29,12 +29,13 @@
use datafusion::{
common::{DataFusionError, JoinSide, Result, Statistics},
execution::context::TaskContext,
- physical_expr::{PhysicalExprRef, PhysicalSortExpr},
+ physical_expr::{EquivalenceProperties, PhysicalExprRef},
physical_plan::{
joins::utils::JoinOn,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, Time},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream,
},
};
use datafusion_ext_commons::{
@@ -82,6 +83,7 @@
is_built: bool, // true for BroadcastHashJoin, false for ShuffledHashJoin
cached_build_hash_map_id: Option<String>,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl BroadcastJoinExec {
@@ -105,6 +107,7 @@
is_built,
cached_build_hash_map_id,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
})
}
@@ -237,6 +240,10 @@
}
impl ExecutionPlan for BroadcastJoinExec {
+ fn name(&self) -> &str {
+ "BroadcastJoin"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -245,19 +252,21 @@
self.schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- match self.broadcast_side {
- JoinSide::Left => self.right.output_partitioning(),
- JoinSide::Right => self.left.output_partitioning(),
- }
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ match self.broadcast_side {
+ JoinSide::Left => self.right.output_partitioning().clone(),
+ JoinSide::Right => self.left.output_partitioning().clone(),
+ },
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.left.clone(), self.right.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.left, &self.right]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs b/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs
index 9792057..1fa6b99 100644
--- a/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs
+++ b/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs
@@ -36,8 +36,9 @@
},
physical_expr::{
expressions::{CaseExpr, Column, Literal, NoOp, SCAndExpr, SCOrExpr},
- scatter, PhysicalExpr, PhysicalExprRef,
+ PhysicalExpr, PhysicalExprRef,
},
+ physical_expr_common::utils::scatter,
physical_plan::ColumnarValue,
};
use datafusion_ext_commons::{cast::cast, uda::UserDefinedArray};
@@ -208,8 +209,14 @@
// short circuiting expression - only first child can be cached
// first `when` expr can also be cached
collect_dups(&expr.children()[0], current_count, expr_counts, dups);
- if expr.as_any().downcast_ref::<CaseExpr>().is_some() {
- collect_dups(&expr.children()[1], current_count, expr_counts, dups);
+ if let Some(case_expr) = expr.as_any().downcast_ref::<CaseExpr>() {
+ if case_expr.expr().is_some() {
+ let children = case_expr.children();
+ if children.len() >= 2 {
+ // cache first `when` expr
+ collect_dups(&expr.children()[1], current_count, expr_counts, dups);
+ }
+ }
}
} else {
expr.children().iter().for_each(|child| {
@@ -254,17 +261,25 @@
{
// short circuiting expression - only first child can be cached
// first `when` expr can also be cached
- let mut children = expr.children();
+ let mut children = expr
+ .children()
+ .iter()
+ .map(|&child| child.clone())
+ .collect::<Vec<_>>();
children[0] = transform(children[0].clone(), cached_expr_ids, cache)?;
- if expr.as_any().downcast_ref::<CaseExpr>().is_some() && children.len() >= 2 {
- children[1] = transform(children[1].clone(), cached_expr_ids, cache)?;
+
+ if let Some(case_expr) = expr.as_any().downcast_ref::<CaseExpr>() {
+ if children.len() >= 2 && case_expr.expr().is_some() {
+ // cache first `when` expr
+ children[1] = transform(children[1].clone(), cached_expr_ids, cache)?;
+ }
}
expr.clone().with_new_children(children)?
} else {
expr.clone().with_new_children(
expr.children()
.into_iter()
- .map(|child| transform(child, cached_expr_ids, cache))
+ .map(|child| transform(child.clone(), cached_expr_ids, cache))
.collect::<Result<_>>()?,
)?
};
@@ -349,7 +364,7 @@
self.cache.get(self.id, || self.orig_expr.evaluate(batch))
}
- fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
self.orig_expr.children()
}
@@ -441,12 +456,13 @@
let mapped_idx = *used_cols_ref.entry(col.index()).or_insert(new_idx);
let mapped_col: PhysicalExprRef = Arc::new(Column::new(col.name(), mapped_idx));
- Ok(Transformed::Yes(mapped_col))
+ Ok(Transformed::yes(mapped_col))
} else {
- Ok(Transformed::Yes(expr))
+ Ok(Transformed::yes(expr))
}
})
- .unwrap();
+ .unwrap()
+ .data;
let mapped_cols: Vec<usize> = used_cols
.take()
diff --git a/native-engine/datafusion-ext-plans/src/common/column_pruning.rs b/native-engine/datafusion-ext-plans/src/common/column_pruning.rs
index c8c1a3a..9159024 100644
--- a/native-engine/datafusion-ext-plans/src/common/column_pruning.rs
+++ b/native-engine/datafusion-ext-plans/src/common/column_pruning.rs
@@ -76,16 +76,18 @@
let mapped_exprs: Vec<PhysicalExprRef> = exprs
.iter()
.map(|expr| {
- expr.clone().transform_down(&|node: PhysicalExprRef| {
- Ok(Transformed::Yes(
- if let Some(column) = node.as_any().downcast_ref::<Column>() {
- let mapped_idx = required_columns_mapping[&column.index()];
- Arc::new(Column::new(column.name(), mapped_idx))
- } else {
- node
- },
- ))
- })
+ expr.clone()
+ .transform_down(&|node: PhysicalExprRef| {
+ Ok(Transformed::yes(
+ if let Some(column) = node.as_any().downcast_ref::<Column>() {
+ let mapped_idx = required_columns_mapping[&column.index()];
+ Arc::new(Column::new(column.name(), mapped_idx))
+ } else {
+ node
+ },
+ ))
+ })
+ .map(|r| r.data)
})
.collect::<Result<_>>()?;
diff --git a/native-engine/datafusion-ext-plans/src/debug_exec.rs b/native-engine/datafusion-ext-plans/src/debug_exec.rs
index 76f7790..481204d 100644
--- a/native-engine/datafusion-ext-plans/src/debug_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/debug_exec.rs
@@ -25,20 +25,22 @@
use datafusion::{
error::Result,
execution::context::TaskContext,
- physical_expr::PhysicalSortExpr,
+ physical_expr::EquivalenceProperties,
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
},
};
use futures::{Stream, StreamExt};
+use once_cell::sync::OnceCell;
#[derive(Debug)]
pub struct DebugExec {
input: Arc<dyn ExecutionPlan>,
debug_id: String,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl DebugExec {
@@ -47,6 +49,7 @@
input,
debug_id,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
}
}
}
@@ -59,6 +62,10 @@
#[async_trait]
impl ExecutionPlan for DebugExec {
+ fn name(&self) -> &str {
+ "DebugExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -67,16 +74,18 @@
self.input.schema()
}
- fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.input.output_ordering()
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/empty_partitions_exec.rs b/native-engine/datafusion-ext-plans/src/empty_partitions_exec.rs
index d3a6588..e335de1 100644
--- a/native-engine/datafusion-ext-plans/src/empty_partitions_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/empty_partitions_exec.rs
@@ -25,19 +25,21 @@
use datafusion::{
error::Result,
execution::context::TaskContext,
- physical_expr::PhysicalSortExpr,
+ physical_expr::EquivalenceProperties,
physical_plan::{
- metrics::MetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
- Partitioning::UnknownPartitioning, RecordBatchStream, SendableRecordBatchStream,
- Statistics,
+ metrics::MetricsSet, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
+ Partitioning::UnknownPartitioning, PlanProperties, RecordBatchStream,
+ SendableRecordBatchStream, Statistics,
},
};
use futures::Stream;
+use once_cell::sync::OnceCell;
#[derive(Debug, Clone)]
pub struct EmptyPartitionsExec {
schema: SchemaRef,
num_partitions: usize,
+ props: OnceCell<PlanProperties>,
}
impl EmptyPartitionsExec {
@@ -45,6 +47,7 @@
Self {
schema,
num_partitions,
+ props: OnceCell::new(),
}
}
}
@@ -61,6 +64,10 @@
#[async_trait]
impl ExecutionPlan for EmptyPartitionsExec {
+ fn name(&self) -> &str {
+ "EmptyPartitionsExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -69,15 +76,17 @@
self.schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- UnknownPartitioning(self.num_partitions)
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ UnknownPartitioning(self.num_partitions),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
diff --git a/native-engine/datafusion-ext-plans/src/expand_exec.rs b/native-engine/datafusion-ext-plans/src/expand_exec.rs
index b1120be..71e0791 100644
--- a/native-engine/datafusion-ext-plans/src/expand_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/expand_exec.rs
@@ -21,17 +21,17 @@
use datafusion::{
common::{Result, Statistics},
execution::context::TaskContext,
- physical_expr::{PhysicalExpr, PhysicalSortExpr},
+ physical_expr::{EquivalenceProperties, PhysicalExpr},
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
- Partitioning::UnknownPartitioning,
- SendableRecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream,
},
};
use datafusion_ext_commons::{cast::cast, df_execution_err};
use futures::{stream::once, StreamExt, TryStreamExt};
+use once_cell::sync::OnceCell;
use crate::common::output::TaskOutputter;
@@ -41,6 +41,7 @@
projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
input: Arc<dyn ExecutionPlan>,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl ExpandExec {
@@ -68,6 +69,7 @@
projections,
input,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
})
}
}
@@ -79,6 +81,10 @@
}
impl ExecutionPlan for ExpandExec {
+ fn name(&self) -> &str {
+ "ExpandExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -87,16 +93,18 @@
self.schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- UnknownPartitioning(0)
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
@@ -108,6 +116,7 @@
projections: self.projections.clone(),
input: children[0].clone(),
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
}))
}
@@ -386,7 +395,7 @@
"| 103.4 |",
"| 0.6 |",
"| 1.15 |",
- "| 0.0 |",
+ "| -0.0 |",
"| -1.7 |",
"| -1.2 |",
"| -0.29999995 |",
diff --git a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
index f79c865..10e7ea9 100644
--- a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
@@ -23,22 +23,24 @@
use datafusion::{
error::Result,
execution::context::TaskContext,
- physical_expr::PhysicalSortExpr,
+ physical_expr::EquivalenceProperties,
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning::UnknownPartitioning,
- SendableRecordBatchStream, Statistics,
+ PlanProperties, SendableRecordBatchStream, Statistics,
},
};
use datafusion_ext_commons::streams::ffi_stream::FFIReaderStream;
use jni::objects::JObject;
+use once_cell::sync::OnceCell;
pub struct FFIReaderExec {
num_partitions: usize,
schema: SchemaRef,
export_iter_provider_resource_id: String,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl FFIReaderExec {
@@ -52,6 +54,7 @@
export_iter_provider_resource_id,
schema,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
}
}
}
@@ -69,6 +72,10 @@
}
impl ExecutionPlan for FFIReaderExec {
+ fn name(&self) -> &str {
+ "FFIReaderExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -77,15 +84,17 @@
self.schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- UnknownPartitioning(self.num_partitions)
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ UnknownPartitioning(self.num_partitions),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
diff --git a/native-engine/datafusion-ext-plans/src/filter_exec.rs b/native-engine/datafusion-ext-plans/src/filter_exec.rs
index fea0a92..ca10294 100644
--- a/native-engine/datafusion-ext-plans/src/filter_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/filter_exec.rs
@@ -18,16 +18,18 @@
use datafusion::{
common::{Result, Statistics},
execution::context::TaskContext,
- physical_expr::{expressions::Column, PhysicalExprRef, PhysicalSortExpr},
+ physical_expr::{expressions::Column, EquivalenceProperties, PhysicalExprRef},
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream,
},
};
use datafusion_ext_commons::{df_execution_err, streams::coalesce_stream::CoalesceInput};
use futures::{stream::once, StreamExt, TryStreamExt};
use itertools::Itertools;
+use once_cell::sync::OnceCell;
use crate::{
common::{
@@ -44,6 +46,7 @@
input: Arc<dyn ExecutionPlan>,
predicates: Vec<PhysicalExprRef>,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl FilterExec {
@@ -66,6 +69,7 @@
input,
predicates,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
})
}
@@ -85,6 +89,10 @@
}
impl ExecutionPlan for FilterExec {
+ fn name(&self) -> &str {
+ "FilterExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -93,16 +101,18 @@
self.input.schema()
}
- fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.input.output_ordering()
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/generate_exec.rs b/native-engine/datafusion-ext-plans/src/generate_exec.rs
index 6c7c7fc..c8d1d99 100644
--- a/native-engine/datafusion-ext-plans/src/generate_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/generate_exec.rs
@@ -27,16 +27,18 @@
use datafusion::{
common::{Result, Statistics},
execution::context::TaskContext,
- physical_expr::{expressions::Column, PhysicalExpr, PhysicalSortExpr},
+ physical_expr::{expressions::Column, EquivalenceProperties, PhysicalExpr},
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream,
},
};
use datafusion_ext_commons::{batch_size, cast::cast, streams::coalesce_stream::CoalesceInput};
use futures::{stream::once, StreamExt, TryFutureExt, TryStreamExt};
use num::integer::Roots;
+use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use crate::{
@@ -57,6 +59,7 @@
input: Arc<dyn ExecutionPlan>,
output_schema: SchemaRef,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl GenerateExec {
@@ -94,6 +97,7 @@
input,
output_schema,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
})
}
@@ -117,6 +121,10 @@
}
impl ExecutionPlan for GenerateExec {
+ fn name(&self) -> &str {
+ "GenerateExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -125,16 +133,18 @@
self.output_schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
index 11ed91d..9396e13 100644
--- a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
@@ -35,13 +35,13 @@
use datafusion::{
error::{DataFusionError, Result},
execution::context::TaskContext,
+ physical_expr::EquivalenceProperties,
physical_plan::{
- expressions::PhysicalSortExpr,
metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning::UnknownPartitioning,
- SendableRecordBatchStream, Statistics,
+ PlanProperties, SendableRecordBatchStream, Statistics,
},
};
use datafusion_ext_commons::{
@@ -50,6 +50,7 @@
};
use futures::{stream::once, TryStreamExt};
use jni::objects::{GlobalRef, JObject};
+use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use crate::common::{ipc_compression::IpcCompressionReader, output::TaskOutputter};
@@ -60,6 +61,7 @@
pub ipc_provider_resource_id: String,
pub schema: SchemaRef,
pub metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl IpcReaderExec {
pub fn new(
@@ -72,6 +74,7 @@
ipc_provider_resource_id,
schema,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
}
}
}
@@ -84,6 +87,10 @@
#[async_trait]
impl ExecutionPlan for IpcReaderExec {
+ fn name(&self) -> &str {
+ "IpcReaderExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -92,15 +99,17 @@
self.schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- UnknownPartitioning(self.num_partitions)
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ UnknownPartitioning(self.num_partitions),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
diff --git a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
index 6aa5544..c22d532 100644
--- a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
@@ -22,17 +22,18 @@
use datafusion::{
error::Result,
execution::context::TaskContext,
- physical_expr::PhysicalSortExpr,
+ physical_expr::EquivalenceProperties,
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
- Statistics,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream, Statistics,
},
};
use datafusion_ext_commons::streams::coalesce_stream::CoalesceInput;
use futures::{stream::once, StreamExt, TryStreamExt};
use jni::objects::{GlobalRef, JObject};
+use once_cell::sync::OnceCell;
use crate::common::{
ipc_compression::IpcCompressionWriter, output::TaskOutputter, timer_helper::TimerHelper,
@@ -43,6 +44,7 @@
input: Arc<dyn ExecutionPlan>,
ipc_consumer_resource_id: String,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl IpcWriterExec {
@@ -51,6 +53,7 @@
input,
ipc_consumer_resource_id,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
}
}
}
@@ -63,6 +66,10 @@
#[async_trait]
impl ExecutionPlan for IpcWriterExec {
+ fn name(&self) -> &str {
+ "IpcWriterExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -71,16 +78,18 @@
self.input.schema()
}
- fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.input.output_ordering()
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/lib.rs b/native-engine/datafusion-ext-plans/src/lib.rs
index 052eaf2..cd9bc49 100644
--- a/native-engine/datafusion-ext-plans/src/lib.rs
+++ b/native-engine/datafusion-ext-plans/src/lib.rs
@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#![allow(incomplete_features)]
+#![allow(internal_features)]
#![feature(adt_const_params)]
#![feature(core_intrinsics)]
#![feature(get_mut_unchecked)]
@@ -50,6 +52,7 @@
pub mod common;
pub mod generate;
pub mod joins;
+mod scan;
mod shuffle;
pub mod window;
diff --git a/native-engine/datafusion-ext-plans/src/limit_exec.rs b/native-engine/datafusion-ext-plans/src/limit_exec.rs
index 5bdd729..825c89c 100644
--- a/native-engine/datafusion-ext-plans/src/limit_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/limit_exec.rs
@@ -10,20 +10,22 @@
use datafusion::{
common::Result,
execution::context::TaskContext,
- physical_expr::PhysicalSortExpr,
+ physical_expr::EquivalenceProperties,
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
},
};
use futures::{Stream, StreamExt};
+use once_cell::sync::OnceCell;
#[derive(Debug)]
pub struct LimitExec {
input: Arc<dyn ExecutionPlan>,
limit: u64,
pub metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl LimitExec {
@@ -32,6 +34,7 @@
input,
limit,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
}
}
}
@@ -43,6 +46,10 @@
}
impl ExecutionPlan for LimitExec {
+ fn name(&self) -> &str {
+ "LimitExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -51,16 +58,18 @@
self.input.schema()
}
- fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.input.output_ordering()
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs
index dfca0d9..4ae1139 100644
--- a/native-engine/datafusion-ext-plans/src/orc_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs
@@ -21,27 +21,32 @@
use blaze_jni_bridge::{jni_call_static, jni_new_global_ref, jni_new_string};
use bytes::Bytes;
use datafusion::{
- datasource::physical_plan::{
- FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileStream, SchemaAdapter,
+ datasource::{
+ physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileStream},
+ schema_adapter::SchemaAdapter,
},
error::Result,
execution::context::TaskContext,
+ physical_expr::EquivalenceProperties,
physical_plan::{
- expressions::PhysicalSortExpr,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning, PhysicalExpr,
- RecordBatchStream, SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Metric, Partitioning,
+ PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
},
};
-use datafusion_ext_commons::{batch_size, hadoop_fs::FsProvider};
+use datafusion_ext_commons::{batch_size, df_execution_err, hadoop_fs::FsProvider};
use futures::{future::BoxFuture, FutureExt, StreamExt};
use futures_util::{stream::once, TryStreamExt};
+use once_cell::sync::OnceCell;
use orc_rust::{
arrow_reader::ArrowReaderBuilder, projection::ProjectionMask, reader::AsyncChunkReader,
};
-use crate::common::{internal_file_reader::InternalFileReader, output::TaskOutputter};
+use crate::{
+ common::{internal_file_reader::InternalFileReader, output::TaskOutputter},
+ scan::BlazeSchemaAdapter,
+};
/// Execution plan for scanning one or more Orc partitions
#[derive(Debug, Clone)]
@@ -50,9 +55,9 @@
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
- projected_output_ordering: Vec<Vec<PhysicalSortExpr>>,
metrics: ExecutionPlanMetricsSet,
_predicate: Option<Arc<dyn PhysicalExpr>>,
+ props: OnceCell<PlanProperties>,
}
impl OrcExec {
@@ -65,7 +70,7 @@
) -> Self {
let metrics = ExecutionPlanMetricsSet::new();
- let (projected_schema, projected_statistics, projected_output_ordering) =
+ let (projected_schema, projected_statistics, _projected_output_ordering) =
base_config.project();
Self {
@@ -73,9 +78,9 @@
base_config,
projected_statistics,
projected_schema,
- projected_output_ordering,
metrics,
_predicate,
+ props: OnceCell::new(),
}
}
}
@@ -101,6 +106,10 @@
}
impl ExecutionPlan for OrcExec {
+ fn name(&self) -> &str {
+ "OrcExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -109,17 +118,17 @@
Arc::clone(&self.projected_schema)
}
- fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ Partitioning::UnknownPartitioning(self.base_config.file_groups.len()),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.projected_output_ordering
- .first()
- .map(|ordering| ordering.as_slice())
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
@@ -218,18 +227,20 @@
let batch_size = self.batch_size;
let projection = self.projection.clone();
let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?);
- let schema_adapter = SchemaAdapter::new(projected_schema);
+ let schema_adapter = BlazeSchemaAdapter::new(projected_schema);
Ok(Box::pin(async move {
let mut builder = ArrowReaderBuilder::try_new_async(reader)
.await
- .map_err(ArrowError::from)?;
+ .or_else(|err| df_execution_err!("create orc reader error: {err}"))?;
if let Some(range) = file_meta.range.clone() {
let range = range.start as usize..range.end as usize;
builder = builder.with_file_byte_range(range);
}
let file_schema = builder.schema();
- let (schema_mapping, adapted_projections) = schema_adapter.map_schema(&file_schema)?;
+ let (schema_mapping, adapted_projections) =
+ schema_adapter.map_schema(file_schema.as_ref())?;
+
// Offset by 1 since index 0 is the root
let projection = adapted_projections
.iter()
diff --git a/native-engine/datafusion-ext-plans/src/parquet_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_exec.rs
index 5291218..28e2591 100644
--- a/native-engine/datafusion-ext-plans/src/parquet_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/parquet_exec.rs
@@ -19,18 +19,14 @@
use std::{any::Any, fmt, fmt::Formatter, ops::Range, pin::Pin, sync::Arc};
-use arrow::{
- array::{Array, ArrayRef, AsArray, ListArray},
- datatypes::{DataType, SchemaRef},
-};
+use arrow::datatypes::SchemaRef;
use blaze_jni_bridge::{
conf, conf::BooleanConf, jni_call_static, jni_new_global_ref, jni_new_string,
};
use bytes::Bytes;
use datafusion::{
- common::DataFusionError,
datasource::physical_plan::{
- parquet::{page_filter::PagePruningPredicate, ParquetOpener},
+ parquet::{page_filter::PagePruningAccessPlanFilter, ParquetOpener},
FileMeta, FileScanConfig, FileStream, OnError, ParquetFileMetrics,
ParquetFileReaderFactory,
},
@@ -41,87 +37,28 @@
errors::ParquetError,
file::metadata::ParquetMetaData,
},
+ physical_expr::EquivalenceProperties,
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
- expressions::PhysicalSortExpr,
metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet, Time,
},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning, PhysicalExpr,
- RecordBatchStream, SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Metric, Partitioning,
+ PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
},
};
-use datafusion_ext_commons::{batch_size, df_execution_err, hadoop_fs::FsProvider};
+use datafusion_ext_commons::{batch_size, hadoop_fs::FsProvider};
use fmt::Debug;
use futures::{future::BoxFuture, stream::once, FutureExt, StreamExt, TryStreamExt};
use object_store::ObjectMeta;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
-use crate::common::{internal_file_reader::InternalFileReader, output::TaskOutputter};
-
-#[no_mangle]
-fn schema_adapter_cast_column(
- col: &ArrayRef,
- data_type: &DataType,
-) -> Result<ArrayRef, DataFusionError> {
- macro_rules! handle_decimal {
- ($s:ident, $t:ident, $tnative:ty, $prec:expr, $scale:expr) => {{
- use arrow::{array::*, datatypes::*};
- type DecimalBuilder = paste::paste! {[<$t Builder>]};
- type IntType = paste::paste! {[<$s Type>]};
-
- let col = col.as_primitive::<IntType>();
- let mut decimal_builder = DecimalBuilder::new();
- for i in 0..col.len() {
- if col.is_valid(i) {
- decimal_builder.append_value(col.value(i) as $tnative);
- } else {
- decimal_builder.append_null();
- }
- }
- Ok(Arc::new(
- decimal_builder
- .finish()
- .with_precision_and_scale($prec, $scale)?,
- ))
- }};
- }
- match data_type {
- DataType::Decimal128(prec, scale) => match col.data_type() {
- DataType::Int8 => handle_decimal!(Int8, Decimal128, i128, *prec, *scale),
- DataType::Int16 => handle_decimal!(Int16, Decimal128, i128, *prec, *scale),
- DataType::Int32 => handle_decimal!(Int32, Decimal128, i128, *prec, *scale),
- DataType::Int64 => handle_decimal!(Int64, Decimal128, i128, *prec, *scale),
- DataType::Decimal128(p, s) if p == prec && s == scale => Ok(col.clone()),
- _ => df_execution_err!(
- "schema_adapter_cast_column unsupported type: {:?} => {:?}",
- col.data_type(),
- data_type,
- ),
- },
- DataType::List(to_field) => match col.data_type() {
- DataType::List(_from_field) => {
- let col = col.as_list::<i32>();
- let from_inner = col.values();
- let to_inner = schema_adapter_cast_column(from_inner, to_field.data_type())?;
- Ok(Arc::new(ListArray::try_new(
- to_field.clone(),
- col.offsets().clone(),
- to_inner,
- col.nulls().cloned(),
- )?))
- }
- _ => df_execution_err!(
- "schema_adapter_cast_column unsupported type: {:?} => {:?}",
- col.data_type(),
- data_type,
- ),
- },
- _ => datafusion_ext_commons::cast::cast_scan_input_array(col.as_ref(), data_type),
- }
-}
+use crate::{
+ common::{internal_file_reader::InternalFileReader, output::TaskOutputter},
+ scan::BlazeSchemaAdapterFactory,
+};
/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
@@ -130,11 +67,11 @@
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
- projected_output_ordering: Vec<Vec<PhysicalSortExpr>>,
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
pruning_predicate: Option<Arc<PruningPredicate>>,
- page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
+ page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
+ props: OnceCell<PlanProperties>,
}
impl ParquetExec {
@@ -162,20 +99,13 @@
}
}
})
- .filter(|p| !p.allways_true());
+ .filter(|p| !p.always_true());
- let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| {
- match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) {
- Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
- Err(e) => {
- log::warn!("Could not create page pruning predicate: {}", e);
- predicate_creation_errors.add(1);
- None
- }
- }
- });
+ let page_pruning_predicate = predicate
+ .as_ref()
+ .map(|p| Arc::new(PagePruningAccessPlanFilter::new(p, file_schema.clone())));
- let (projected_schema, projected_statistics, projected_output_ordering) =
+ let (projected_schema, projected_statistics, _projected_output_ordering) =
base_config.project();
Self {
@@ -183,11 +113,11 @@
base_config,
projected_schema,
projected_statistics,
- projected_output_ordering,
metrics,
predicate,
pruning_predicate,
page_pruning_predicate,
+ props: OnceCell::new(),
}
}
}
@@ -217,6 +147,10 @@
}
impl ExecutionPlan for ParquetExec {
+ fn name(&self) -> &str {
+ "ParquetExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -225,25 +159,20 @@
Arc::clone(&self.projected_schema)
}
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
- fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ Partitioning::UnknownPartitioning(self.base_config.file_groups.len()),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.projected_output_ordering
- .first()
- .map(|ordering| ordering.as_slice())
- }
-
- // in datafusion 20.0.0 ExecutionPlan trait not include relies_on_input_order
- // fn relies_on_input_order(&self) -> bool {
- // false
- // }
-
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
@@ -275,6 +204,7 @@
let fs = jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?;
let fs_provider = Arc::new(FsProvider::new(jni_new_global_ref!(fs.as_obj())?, &io_time));
+ let schema_adapter_factory = Arc::new(BlazeSchemaAdapterFactory);
let projection = match self.base_config.file_column_projection_indices() {
Some(proj) => proj,
None => (0..self.base_config.file_schema.fields().len()).collect(),
@@ -299,6 +229,7 @@
reorder_filters: page_filtering_enabled,
enable_page_index: page_filtering_enabled,
enable_bloom_filter: bloom_filter_enabled,
+ schema_adapter_factory,
};
let mut file_stream =
@@ -434,7 +365,7 @@
let inner = self.0.clone();
let meta_size = inner.get_meta().size;
- let size_hint = Some(1048576);
+ let size_hint = None;
let cache_slot = (move || {
let mut metadata_cache = METADATA_CACHE.get_or_init(|| Mutex::new(Vec::new())).lock();
diff --git a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs
index 00ef3f1..8b00a7d 100644
--- a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs
@@ -31,12 +31,12 @@
file::properties::{EnabledStatistics, WriterProperties, WriterVersion},
schema::{parser::parse_message_type, types::SchemaDescriptor},
},
- physical_expr::PhysicalSortExpr,
+ physical_expr::EquivalenceProperties,
physical_plan::{
metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning,
- SendableRecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ Metric, PlanProperties, SendableRecordBatchStream,
},
};
use datafusion_ext_commons::{
@@ -46,6 +46,7 @@
hadoop_fs::{FsDataOutputStream, FsProvider},
};
use futures::{stream::once, StreamExt, TryStreamExt};
+use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use crate::common::output::TaskOutputter;
@@ -57,6 +58,7 @@
num_dyn_parts: usize,
props: Vec<(String, String)>,
metrics: ExecutionPlanMetricsSet,
+ plan_props: OnceCell<PlanProperties>,
}
impl ParquetSinkExec {
@@ -72,6 +74,7 @@
num_dyn_parts,
props,
metrics: ExecutionPlanMetricsSet::new(),
+ plan_props: OnceCell::new(),
}
}
}
@@ -83,6 +86,10 @@
}
impl ExecutionPlan for ParquetSinkExec {
+ fn name(&self) -> &str {
+ "ParquetSinkExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -91,16 +98,18 @@
self.input.schema()
}
- fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ fn properties(&self) -> &PlanProperties {
+ self.plan_props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.input.output_ordering()
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/project_exec.rs b/native-engine/datafusion-ext-plans/src/project_exec.rs
index ef974d8..617df65 100644
--- a/native-engine/datafusion-ext-plans/src/project_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/project_exec.rs
@@ -21,16 +21,18 @@
use datafusion::{
common::{Result, Statistics},
execution::TaskContext,
- physical_expr::{PhysicalExprRef, PhysicalSortExpr},
+ physical_expr::{EquivalenceProperties, PhysicalExprRef},
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream,
},
};
use datafusion_ext_commons::streams::coalesce_stream::CoalesceInput;
use futures::{stream::once, FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools;
+use once_cell::sync::OnceCell;
use crate::{
common::{
@@ -48,6 +50,7 @@
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl ProjectExec {
@@ -73,6 +76,7 @@
input,
schema,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
})
}
}
@@ -91,6 +95,10 @@
}
impl ExecutionPlan for ProjectExec {
+ fn name(&self) -> &str {
+ "ProjectExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -99,16 +107,18 @@
self.schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(self.input.output_partitioning().partition_count())
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
@@ -181,6 +191,7 @@
expr: projection.iter().map(|&i| self.expr[i].clone()).collect(),
schema: Arc::new(self.schema.project(projection)?),
metrics: self.metrics.clone(),
+ props: OnceCell::new(),
});
projected_project.execute(partition, context)
}
diff --git a/native-engine/datafusion-ext-plans/src/rename_columns_exec.rs b/native-engine/datafusion-ext-plans/src/rename_columns_exec.rs
index 69b46cf..1642a40 100644
--- a/native-engine/datafusion-ext-plans/src/rename_columns_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/rename_columns_exec.rs
@@ -28,14 +28,15 @@
use datafusion::{
error::Result,
execution::context::TaskContext,
- physical_expr::PhysicalSortExpr,
+ physical_expr::EquivalenceProperties,
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
},
};
use futures::{Stream, StreamExt};
+use once_cell::sync::OnceCell;
use crate::agg::AGG_BUF_COLUMN_NAME;
@@ -45,6 +46,7 @@
renamed_column_names: Vec<String>,
renamed_schema: SchemaRef,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl RenameColumnsExec {
@@ -88,6 +90,7 @@
renamed_column_names,
renamed_schema,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
})
}
}
@@ -100,6 +103,10 @@
#[async_trait]
impl ExecutionPlan for RenameColumnsExec {
+ fn name(&self) -> &str {
+ "RenameColumnsExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -108,16 +115,18 @@
self.renamed_schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.input.output_ordering()
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs b/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs
index bec2aa6..af981c5 100644
--- a/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs
@@ -22,15 +22,16 @@
arrow::datatypes::SchemaRef,
error::{DataFusionError, Result},
execution::context::TaskContext,
+ physical_expr::EquivalenceProperties,
physical_plan::{
- expressions::PhysicalSortExpr,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
- Statistics,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
+ SendableRecordBatchStream, Statistics,
},
};
use futures::{stream::once, TryStreamExt};
+use once_cell::sync::OnceCell;
use crate::{
common::timer_helper::RegisterTimer,
@@ -46,14 +47,11 @@
/// order of the resulting partitions.
#[derive(Debug)]
pub struct RssShuffleWriterExec {
- /// Input execution plan
input: Arc<dyn ExecutionPlan>,
- /// Partitioning scheme to use
partitioning: Partitioning,
- /// scala rssShuffleWriter
pub rss_partition_writer_resource_id: String,
- /// Metrics
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl DisplayAs for RssShuffleWriterExec {
@@ -68,26 +66,30 @@
#[async_trait]
impl ExecutionPlan for RssShuffleWriterExec {
- /// Return a reference to Any that can be used for downcasting
+ fn name(&self) -> &str {
+ "RssShuffleWriterExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
- /// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.input.schema()
}
- fn output_partitioning(&self) -> Partitioning {
- self.partitioning.clone()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.partitioning.clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
@@ -172,6 +174,7 @@
partitioning,
rss_partition_writer_resource_id,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
})
}
}
diff --git a/native-engine/datafusion-ext-plans/src/scan/mod.rs b/native-engine/datafusion-ext-plans/src/scan/mod.rs
new file mode 100644
index 0000000..f616d5d
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/scan/mod.rs
@@ -0,0 +1,187 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{fmt::Debug, sync::Arc};
+
+use arrow::{
+ array::{new_null_array, Array, ArrayRef, AsArray, ListArray, RecordBatch, RecordBatchOptions},
+ datatypes::{DataType, Schema, SchemaRef},
+};
+use datafusion::{
+ common::Result,
+ datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper},
+};
+use datafusion_ext_commons::df_execution_err;
+
+#[derive(Debug)]
+pub struct BlazeSchemaAdapterFactory;
+
+impl SchemaAdapterFactory for BlazeSchemaAdapterFactory {
+ fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+ Box::new(BlazeSchemaAdapter::new(schema))
+ }
+}
+
+pub struct BlazeSchemaAdapter {
+ table_schema: SchemaRef,
+}
+
+impl BlazeSchemaAdapter {
+ pub fn new(table_schema: SchemaRef) -> Self {
+ Self { table_schema }
+ }
+}
+
+impl SchemaAdapter for BlazeSchemaAdapter {
+ fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
+ let field = self.table_schema.field(index);
+
+ // use case insensitive matching
+ file_schema
+ .fields
+ .iter()
+ .position(|f| f.name().eq_ignore_ascii_case(field.name()))
+ }
+
+ fn map_schema(&self, file_schema: &Schema) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+ let mut projection = Vec::with_capacity(file_schema.fields().len());
+ let mut field_mappings = vec![None; self.table_schema.fields().len()];
+
+ for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+ if let Some((table_idx, _table_field)) =
+ self.table_schema.fields().find(file_field.name())
+ {
+ field_mappings[table_idx] = Some(projection.len());
+ projection.push(file_idx);
+ }
+ }
+
+ Ok((
+ Arc::new(BlazeSchemaMapping {
+ table_schema: self.table_schema.clone(),
+ field_mappings,
+ }),
+ projection,
+ ))
+ }
+}
+
+#[derive(Debug)]
+pub struct BlazeSchemaMapping {
+ table_schema: SchemaRef,
+ field_mappings: Vec<Option<usize>>,
+}
+
+impl SchemaMapper for BlazeSchemaMapping {
+ fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
+ let batch_rows = batch.num_rows();
+ let batch_cols = batch.columns().to_vec();
+
+ let cols = self
+ .table_schema
+ .fields()
+ .iter()
+ .zip(&self.field_mappings)
+ .map(|(field, file_idx)| match file_idx {
+ Some(batch_idx) => {
+ schema_adapter_cast_column(&batch_cols[*batch_idx], field.data_type())
+ }
+ None => Ok(new_null_array(field.data_type(), batch_rows)),
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+ let schema = self.table_schema.clone();
+ let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
+ Ok(record_batch)
+ }
+
+ fn map_partial_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
+ let batch_cols = batch.columns().to_vec();
+ let schema = batch.schema();
+
+ let mut cols = vec![];
+ let mut fields = vec![];
+ for (i, f) in schema.fields().iter().enumerate() {
+ let table_field = self.table_schema.field_with_name(f.name());
+ if let Ok(tf) = table_field {
+ cols.push(schema_adapter_cast_column(&batch_cols[i], tf.data_type())?);
+ fields.push(tf.clone());
+ }
+ }
+
+ let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+ let schema = Arc::new(Schema::new(fields));
+ let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
+ Ok(record_batch)
+ }
+}
+
+fn schema_adapter_cast_column(col: &ArrayRef, data_type: &DataType) -> Result<ArrayRef> {
+ macro_rules! handle_decimal {
+ ($s:ident, $t:ident, $tnative:ty, $prec:expr, $scale:expr) => {{
+ use arrow::{array::*, datatypes::*};
+ type DecimalBuilder = paste::paste! {[<$t Builder>]};
+ type IntType = paste::paste! {[<$s Type>]};
+
+ let col = col.as_primitive::<IntType>();
+ let mut decimal_builder = DecimalBuilder::new();
+ for i in 0..col.len() {
+ if col.is_valid(i) {
+ decimal_builder.append_value(col.value(i) as $tnative);
+ } else {
+ decimal_builder.append_null();
+ }
+ }
+ Ok(Arc::new(
+ decimal_builder
+ .finish()
+ .with_precision_and_scale($prec, $scale)?,
+ ))
+ }};
+ }
+ match data_type {
+ DataType::Decimal128(prec, scale) => match col.data_type() {
+ DataType::Int8 => handle_decimal!(Int8, Decimal128, i128, *prec, *scale),
+ DataType::Int16 => handle_decimal!(Int16, Decimal128, i128, *prec, *scale),
+ DataType::Int32 => handle_decimal!(Int32, Decimal128, i128, *prec, *scale),
+ DataType::Int64 => handle_decimal!(Int64, Decimal128, i128, *prec, *scale),
+ DataType::Decimal128(p, s) if p == prec && s == scale => Ok(col.clone()),
+ _ => df_execution_err!(
+ "schema_adapter_cast_column unsupported type: {:?} => {:?}",
+ col.data_type(),
+ data_type,
+ ),
+ },
+ DataType::List(to_field) => match col.data_type() {
+ DataType::List(_from_field) => {
+ let col = col.as_list::<i32>();
+ let from_inner = col.values();
+ let to_inner = schema_adapter_cast_column(from_inner, to_field.data_type())?;
+ Ok(Arc::new(ListArray::try_new(
+ to_field.clone(),
+ col.offsets().clone(),
+ to_inner,
+ col.nulls().cloned(),
+ )?))
+ }
+ _ => df_execution_err!(
+ "schema_adapter_cast_column unsupported type: {:?} => {:?}",
+ col.data_type(),
+ data_type,
+ ),
+ },
+ _ => datafusion_ext_commons::cast::cast_scan_input_array(col.as_ref(), data_type),
+ }
+}
diff --git a/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs b/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs
index d375d42..c389dcb 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs
@@ -21,16 +21,17 @@
use datafusion::{
error::Result,
execution::context::TaskContext,
+ physical_expr::EquivalenceProperties,
physical_plan::{
- expressions::PhysicalSortExpr,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
- Statistics,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
+ SendableRecordBatchStream, Statistics,
},
};
use datafusion_ext_commons::df_execution_err;
use futures::{stream::once, TryStreamExt};
+use once_cell::sync::OnceCell;
use crate::{
common::{
@@ -49,16 +50,12 @@
/// the resulting partitions.
#[derive(Debug)]
pub struct ShuffleWriterExec {
- /// Input execution plan
input: Arc<dyn ExecutionPlan>,
- /// Partitioning scheme to use
partitioning: Partitioning,
- /// Output data file path
output_data_file: String,
- /// Output index file path
output_index_file: String,
- /// Metrics
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl DisplayAs for ShuffleWriterExec {
@@ -69,26 +66,30 @@
#[async_trait]
impl ExecutionPlan for ShuffleWriterExec {
- /// Return a reference to Any that can be used for downcasting
+ fn name(&self) -> &str {
+ "ShuffleWriterExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
- /// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.input.schema()
}
- fn output_partitioning(&self) -> Partitioning {
- self.partitioning.clone()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.partitioning.clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
@@ -179,6 +180,7 @@
metrics: ExecutionPlanMetricsSet::new(),
output_data_file,
output_index_file,
+ props: OnceCell::new(),
})
}
}
diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs b/native-engine/datafusion-ext-plans/src/sort_exec.rs
index 336932b..8d7a3a9 100644
--- a/native-engine/datafusion-ext-plans/src/sort_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs
@@ -36,11 +36,12 @@
use datafusion::{
common::{Result, Statistics},
execution::context::TaskContext,
- physical_expr::{expressions::Column, PhysicalSortExpr},
+ physical_expr::{expressions::Column, EquivalenceProperties, PhysicalSortExpr},
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream,
},
};
use datafusion_ext_commons::{
@@ -81,6 +82,7 @@
exprs: Vec<PhysicalSortExpr>,
fetch: Option<usize>,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl SortExec {
@@ -95,6 +97,7 @@
exprs,
fetch,
metrics,
+ props: OnceCell::new(),
}
}
}
@@ -112,6 +115,10 @@
}
impl ExecutionPlan for SortExec {
+ fn name(&self) -> &str {
+ "SortExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -120,28 +127,29 @@
self.input.schema()
}
- fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- Some(&self.exprs)
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(Self {
- input: children[0].clone(),
- exprs: self.exprs.clone(),
- fetch: self.fetch,
- metrics: ExecutionPlanMetricsSet::new(),
- }))
+ Ok(Arc::new(Self::new(
+ children[0].clone(),
+ self.exprs.clone(),
+ self.fetch,
+ )))
}
fn execute(
@@ -1352,12 +1360,15 @@
mod fuzztest {
use std::sync::Arc;
- use arrow::{compute::SortOptions, record_batch::RecordBatch};
+ use arrow::{
+ array::{ArrayRef, UInt32Array},
+ compute::{concat_batches, SortOptions},
+ record_batch::RecordBatch,
+ };
use datafusion::{
- common::{Result, ScalarValue},
- logical_expr::ColumnarValue,
- physical_expr::{expressions::Column, math_expressions::random, PhysicalSortExpr},
- physical_plan::{coalesce_batches::concat_batches, memory::MemoryExec},
+ common::Result,
+ physical_expr::{expressions::Column, PhysicalSortExpr},
+ physical_plan::memory::MemoryExec,
prelude::{SessionConfig, SessionContext},
};
@@ -1375,13 +1386,26 @@
let mut batches = vec![];
let mut num_rows = 0;
while num_rows < n {
- let nulls = ScalarValue::Null
- .to_array_of_size((n - num_rows).min(10000))
- .unwrap();
- let rand_key1 = random(&[ColumnarValue::Array(nulls.clone())])?.into_array(0)?;
- let rand_key2 = random(&[ColumnarValue::Array(nulls.clone())])?.into_array(0)?;
- let rand_val1 = random(&[ColumnarValue::Array(nulls.clone())])?.into_array(0)?;
- let rand_val2 = random(&[ColumnarValue::Array(nulls.clone())])?.into_array(0)?;
+ let rand_key1: ArrayRef = Arc::new(
+ std::iter::repeat_with(|| rand::random::<u32>())
+ .take((n - num_rows).min(10000))
+ .collect::<UInt32Array>(),
+ );
+ let rand_key2: ArrayRef = Arc::new(
+ std::iter::repeat_with(|| rand::random::<u32>())
+ .take((n - num_rows).min(10000))
+ .collect::<UInt32Array>(),
+ );
+ let rand_val1: ArrayRef = Arc::new(
+ std::iter::repeat_with(|| rand::random::<u32>())
+ .take((n - num_rows).min(10000))
+ .collect::<UInt32Array>(),
+ );
+ let rand_val2: ArrayRef = Arc::new(
+ std::iter::repeat_with(|| rand::random::<u32>())
+ .take((n - num_rows).min(10000))
+ .collect::<UInt32Array>(),
+ );
let batch = RecordBatch::try_from_iter_with_nullable(vec![
("k1", rand_key1, true),
("k2", rand_key2, true),
@@ -1410,7 +1434,7 @@
)?);
let sort = Arc::new(SortExec::new(input, sort_exprs.clone(), None));
let output = datafusion::physical_plan::collect(sort, task_ctx.clone()).await?;
- let a = concat_batches(&schema, &output, n)?;
+ let a = concat_batches(&schema, &output)?;
let input = Arc::new(MemoryExec::try_new(
&[batches.clone()],
@@ -1422,7 +1446,7 @@
input,
));
let output = datafusion::physical_plan::collect(sort, task_ctx.clone()).await?;
- let b = concat_batches(&schema, &output, n)?;
+ let b = concat_batches(&schema, &output)?;
assert_eq!(a.num_rows(), b.num_rows());
assert!(a == b);
diff --git a/native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs b/native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs
index 75b8f63..db4c6e2 100644
--- a/native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs
@@ -20,19 +20,20 @@
common::{DataFusionError, JoinSide},
error::Result,
execution::context::TaskContext,
- physical_expr::{PhysicalExprRef, PhysicalSortExpr},
+ physical_expr::{EquivalenceProperties, PhysicalExprRef},
physical_plan::{
joins::utils::JoinOn,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, Time},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
- Statistics,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream, Statistics,
},
};
use datafusion_ext_commons::{
batch_size, df_execution_err, streams::coalesce_stream::CoalesceInput,
};
use futures::TryStreamExt;
+use once_cell::sync::OnceCell;
use crate::{
common::{
@@ -62,6 +63,7 @@
sort_options: Vec<SortOptions>,
schema: SchemaRef,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl SortMergeJoinExec {
@@ -81,6 +83,7 @@
join_type,
sort_options,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
})
}
@@ -177,6 +180,10 @@
}
impl ExecutionPlan for SortMergeJoinExec {
+ fn name(&self) -> &str {
+ "SortMergeJoinExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -185,21 +192,18 @@
self.schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- self.right.output_partitioning()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.right.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- match self.join_type {
- Left | LeftSemi | LeftAnti | Existence => self.left.output_ordering(),
- Right | RightSemi | RightAnti => self.right.output_ordering(),
- Inner => self.left.output_ordering(),
- Full => None,
- }
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.left.clone(), self.right.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.left, &self.right]
}
fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/window_exec.rs b/native-engine/datafusion-ext-plans/src/window_exec.rs
index 648463c..db83617 100644
--- a/native-engine/datafusion-ext-plans/src/window_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/window_exec.rs
@@ -23,16 +23,17 @@
use datafusion::{
common::{Result, Statistics},
execution::context::TaskContext,
- physical_expr::PhysicalSortExpr,
+ physical_expr::{EquivalenceProperties, PhysicalSortExpr},
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
- SendableRecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+ PhysicalExpr, PlanProperties, SendableRecordBatchStream,
},
};
use datafusion_ext_commons::{cast::cast, streams::coalesce_stream::CoalesceInput};
use futures::{stream::once, StreamExt, TryFutureExt, TryStreamExt};
+use once_cell::sync::OnceCell;
use crate::{
common::output::TaskOutputter,
@@ -44,6 +45,7 @@
input: Arc<dyn ExecutionPlan>,
context: Arc<WindowContext>,
metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
}
impl WindowExec {
@@ -63,6 +65,7 @@
input,
context,
metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
})
}
}
@@ -74,6 +77,10 @@
}
impl ExecutionPlan for WindowExec {
+ fn name(&self) -> &str {
+ "WindowExec"
+ }
+
fn as_any(&self) -> &dyn Any {
self
}
@@ -82,16 +89,18 @@
self.context.output_schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ self.input.output_partitioning().clone(),
+ ExecutionMode::Bounded,
+ )
+ })
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.input.output_ordering()
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
}
fn with_new_children(
diff --git a/pom.xml b/pom.xml
index c212956..0f2f761 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
<properties>
<revision>3.0.1-SNAPSHOT</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <arrowVersion>15.0.2</arrowVersion>
+ <arrowVersion>16.0.0</arrowVersion>
<protobufVersion>3.21.9</protobufVersion>
<hadoopClientApiVersion>3.4.0</hadoopClientApiVersion>
</properties>