update to datafusion-v42/arrow-v53/arrow-java-v16
diff --git a/Cargo.lock b/Cargo.lock
index d35ba60..573e319 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4,9 +4,9 @@
 
 [[package]]
 name = "addr2line"
-version = "0.24.1"
+version = "0.24.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375"
+checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
 dependencies = [
  "gimli",
 ]
@@ -78,9 +78,9 @@
 
 [[package]]
 name = "anyhow"
-version = "1.0.88"
+version = "1.0.89"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4e1496f8fb1fbf272686b8d37f523dab3e4a7443300055e74cdaa449f3114356"
+checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
 
 [[package]]
 name = "arrayref"
@@ -96,8 +96,8 @@
 
 [[package]]
 name = "arrow"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "arrow-arith",
  "arrow-array",
@@ -116,8 +116,8 @@
 
 [[package]]
 name = "arrow-arith"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -130,24 +130,24 @@
 
 [[package]]
 name = "arrow-array"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "ahash",
  "arrow-buffer",
  "arrow-data",
  "arrow-schema",
  "chrono",
- "chrono-tz 0.8.6",
+ "chrono-tz",
  "half",
- "hashbrown",
+ "hashbrown 0.14.5",
  "num",
 ]
 
 [[package]]
 name = "arrow-buffer"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "bytes",
  "half",
@@ -156,26 +156,28 @@
 
 [[package]]
 name = "arrow-cast"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
  "arrow-data",
  "arrow-schema",
  "arrow-select",
- "base64 0.21.7",
+ "atoi",
+ "base64",
  "chrono",
  "comfy-table",
  "half",
  "lexical-core",
  "num",
+ "ryu",
 ]
 
 [[package]]
 name = "arrow-csv"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -192,8 +194,8 @@
 
 [[package]]
 name = "arrow-data"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "arrow-buffer",
  "arrow-schema",
@@ -203,8 +205,8 @@
 
 [[package]]
 name = "arrow-ipc"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -217,8 +219,8 @@
 
 [[package]]
 name = "arrow-json"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -236,8 +238,8 @@
 
 [[package]]
 name = "arrow-ord"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -250,8 +252,8 @@
 
 [[package]]
 name = "arrow-row"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -259,13 +261,12 @@
  "arrow-data",
  "arrow-schema",
  "half",
- "hashbrown",
 ]
 
 [[package]]
 name = "arrow-schema"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "bitflags 2.6.0",
  "serde",
@@ -273,8 +274,8 @@
 
 [[package]]
 name = "arrow-select"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -286,14 +287,15 @@
 
 [[package]]
 name = "arrow-string"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
  "arrow-data",
  "arrow-schema",
  "arrow-select",
+ "memchr",
  "num",
  "regex",
  "regex-syntax",
@@ -301,9 +303,9 @@
 
 [[package]]
 name = "async-compression"
-version = "0.4.12"
+version = "0.4.13"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fec134f64e2bc57411226dfc4e52dec859ddfc7e711fc5e07b612584f000e4aa"
+checksum = "7e614738943d3f68c628ae3dbce7c3daffb196665f82f8c8ea6b65de73c79429"
 dependencies = [
  "bzip2",
  "flate2",
@@ -325,14 +327,23 @@
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
+]
+
+[[package]]
+name = "atoi"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528"
+dependencies = [
+ "num-traits",
 ]
 
 [[package]]
 name = "autocfg"
-version = "1.3.0"
+version = "1.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
+checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
 
 [[package]]
 name = "backtrace"
@@ -351,12 +362,6 @@
 
 [[package]]
 name = "base64"
-version = "0.21.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
-
-[[package]]
-name = "base64"
 version = "0.22.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
@@ -457,7 +462,7 @@
 version = "0.1.0"
 dependencies = [
  "arrow",
- "base64 0.22.1",
+ "base64",
  "datafusion",
  "datafusion-ext-commons",
  "datafusion-ext-exprs",
@@ -480,9 +485,9 @@
 
 [[package]]
 name = "brotli"
-version = "3.5.0"
+version = "6.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391"
+checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b"
 dependencies = [
  "alloc-no-stdlib",
  "alloc-stdlib",
@@ -491,9 +496,9 @@
 
 [[package]]
 name = "brotli-decompressor"
-version = "2.5.1"
+version = "4.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f"
+checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362"
 dependencies = [
  "alloc-no-stdlib",
  "alloc-stdlib",
@@ -546,9 +551,9 @@
 
 [[package]]
 name = "cc"
-version = "1.1.18"
+version = "1.1.28"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476"
+checksum = "2e80e3b6a3ab07840e1cae9b0666a63970dc28e8ed5ffbcdacbfc760c281bfc1"
 dependencies = [
  "jobserver",
  "libc",
@@ -581,39 +586,17 @@
 
 [[package]]
 name = "chrono-tz"
-version = "0.8.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e"
-dependencies = [
- "chrono",
- "chrono-tz-build 0.2.1",
- "phf",
-]
-
-[[package]]
-name = "chrono-tz"
 version = "0.9.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb"
 dependencies = [
  "chrono",
- "chrono-tz-build 0.3.0",
+ "chrono-tz-build",
  "phf",
 ]
 
 [[package]]
 name = "chrono-tz-build"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f"
-dependencies = [
- "parse-zoneinfo",
- "phf",
- "phf_codegen",
-]
-
-[[package]]
-name = "chrono-tz-build"
 version = "0.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1"
@@ -701,6 +684,12 @@
 ]
 
 [[package]]
+name = "crossbeam-utils"
+version = "0.8.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
+
+[[package]]
 name = "crunchy"
 version = "0.2.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -739,12 +728,13 @@
 
 [[package]]
 name = "dashmap"
-version = "5.5.3"
+version = "6.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
+checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
 dependencies = [
  "cfg-if",
- "hashbrown",
+ "crossbeam-utils",
+ "hashbrown 0.14.5",
  "lock_api",
  "once_cell",
  "parking_lot_core",
@@ -752,8 +742,8 @@
 
 [[package]]
 name = "datafusion"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
 dependencies = [
  "ahash",
  "arrow",
@@ -766,27 +756,34 @@
  "bzip2",
  "chrono",
  "dashmap",
+ "datafusion-catalog",
  "datafusion-common",
+ "datafusion-common-runtime",
  "datafusion-execution",
  "datafusion-expr",
  "datafusion-functions",
- "datafusion-functions-array",
+ "datafusion-functions-aggregate",
+ "datafusion-functions-nested",
+ "datafusion-functions-window",
  "datafusion-optimizer",
  "datafusion-physical-expr",
+ "datafusion-physical-expr-common",
+ "datafusion-physical-optimizer",
  "datafusion-physical-plan",
  "datafusion-sql",
  "flate2",
  "futures",
  "glob",
  "half",
- "hashbrown",
+ "hashbrown 0.14.5",
  "indexmap",
- "itertools 0.12.1",
+ "itertools 0.13.0",
  "log",
  "num_cpus",
  "object_store",
  "parking_lot",
  "parquet",
+ "paste",
  "pin-project-lite",
  "rand",
  "sqlparser",
@@ -800,9 +797,23 @@
 ]
 
 [[package]]
+name = "datafusion-catalog"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "arrow-schema",
+ "async-trait",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-physical-plan",
+ "parking_lot",
+]
+
+[[package]]
 name = "datafusion-common"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
 dependencies = [
  "ahash",
  "arrow",
@@ -811,17 +822,30 @@
  "arrow-schema",
  "chrono",
  "half",
+ "hashbrown 0.14.5",
+ "instant",
  "libc",
  "num_cpus",
  "object_store",
  "parquet",
+ "paste",
  "sqlparser",
+ "tokio",
+]
+
+[[package]]
+name = "datafusion-common-runtime"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "log",
+ "tokio",
 ]
 
 [[package]]
 name = "datafusion-execution"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
 dependencies = [
  "arrow",
  "chrono",
@@ -829,7 +853,7 @@
  "datafusion-common",
  "datafusion-expr",
  "futures",
- "hashbrown",
+ "hashbrown 0.14.5",
  "log",
  "object_store",
  "parking_lot",
@@ -840,20 +864,36 @@
 
 [[package]]
 name = "datafusion-expr"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
 dependencies = [
  "ahash",
  "arrow",
  "arrow-array",
+ "arrow-buffer",
+ "chrono",
  "datafusion-common",
+ "datafusion-expr-common",
+ "datafusion-functions-aggregate-common",
+ "datafusion-physical-expr-common",
  "paste",
+ "serde_json",
  "sqlparser",
  "strum",
  "strum_macros",
 ]
 
 [[package]]
+name = "datafusion-expr-common"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "arrow",
+ "datafusion-common",
+ "paste",
+]
+
+[[package]]
 name = "datafusion-ext-commons"
 version = "0.1.0"
 dependencies = [
@@ -925,7 +965,7 @@
 dependencies = [
  "arrow",
  "async-trait",
- "base64 0.22.1",
+ "base64",
  "bitvec",
  "blaze-jni-bridge",
  "byteorder",
@@ -940,7 +980,7 @@
  "futures",
  "futures-util",
  "gxhash",
- "hashbrown",
+ "hashbrown 0.14.5",
  "itertools 0.13.0",
  "jni",
  "log",
@@ -952,6 +992,7 @@
  "panic-message",
  "parking_lot",
  "paste",
+ "rand",
  "slimmer_box",
  "smallvec",
  "tempfile",
@@ -963,76 +1004,23 @@
 
 [[package]]
 name = "datafusion-functions"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
 dependencies = [
  "arrow",
- "base64 0.21.7",
- "datafusion-common",
- "datafusion-execution",
- "datafusion-expr",
- "hex",
- "log",
-]
-
-[[package]]
-name = "datafusion-functions-array"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
-dependencies = [
- "arrow",
- "datafusion-common",
- "datafusion-execution",
- "datafusion-expr",
- "log",
- "paste",
-]
-
-[[package]]
-name = "datafusion-optimizer"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
-dependencies = [
- "arrow",
- "async-trait",
- "chrono",
- "datafusion-common",
- "datafusion-expr",
- "datafusion-physical-expr",
- "hashbrown",
- "itertools 0.12.1",
- "log",
- "regex-syntax",
-]
-
-[[package]]
-name = "datafusion-physical-expr"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
-dependencies = [
- "ahash",
- "arrow",
- "arrow-array",
  "arrow-buffer",
- "arrow-ord",
- "arrow-schema",
- "arrow-string",
- "base64 0.21.7",
+ "base64",
  "blake2",
  "blake3",
  "chrono",
  "datafusion-common",
  "datafusion-execution",
  "datafusion-expr",
- "half",
- "hashbrown",
+ "hashbrown 0.14.5",
  "hex",
- "indexmap",
- "itertools 0.12.1",
+ "itertools 0.13.0",
  "log",
  "md-5",
- "paste",
- "petgraph",
  "rand",
  "regex",
  "sha2",
@@ -1041,46 +1029,195 @@
 ]
 
 [[package]]
-name = "datafusion-physical-plan"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+name = "datafusion-functions-aggregate"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "ahash",
+ "arrow",
+ "arrow-schema",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-functions-aggregate-common",
+ "datafusion-physical-expr",
+ "datafusion-physical-expr-common",
+ "half",
+ "log",
+ "paste",
+ "sqlparser",
+]
+
+[[package]]
+name = "datafusion-functions-aggregate-common"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "ahash",
+ "arrow",
+ "datafusion-common",
+ "datafusion-expr-common",
+ "datafusion-physical-expr-common",
+ "rand",
+]
+
+[[package]]
+name = "datafusion-functions-nested"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "arrow",
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-ord",
+ "arrow-schema",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-functions",
+ "datafusion-functions-aggregate",
+ "datafusion-physical-expr-common",
+ "itertools 0.13.0",
+ "log",
+ "paste",
+ "rand",
+]
+
+[[package]]
+name = "datafusion-functions-window"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "datafusion-common",
+ "datafusion-expr",
+ "datafusion-physical-expr-common",
+ "log",
+]
+
+[[package]]
+name = "datafusion-optimizer"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "arrow",
+ "async-trait",
+ "chrono",
+ "datafusion-common",
+ "datafusion-expr",
+ "datafusion-physical-expr",
+ "hashbrown 0.14.5",
+ "indexmap",
+ "itertools 0.13.0",
+ "log",
+ "paste",
+ "regex-syntax",
+]
+
+[[package]]
+name = "datafusion-physical-expr"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
 dependencies = [
  "ahash",
  "arrow",
  "arrow-array",
  "arrow-buffer",
+ "arrow-ord",
  "arrow-schema",
- "async-trait",
+ "arrow-string",
+ "base64",
  "chrono",
  "datafusion-common",
  "datafusion-execution",
  "datafusion-expr",
+ "datafusion-expr-common",
+ "datafusion-functions-aggregate-common",
+ "datafusion-physical-expr-common",
+ "half",
+ "hashbrown 0.14.5",
+ "hex",
+ "indexmap",
+ "itertools 0.13.0",
+ "log",
+ "paste",
+ "petgraph",
+ "regex",
+]
+
+[[package]]
+name = "datafusion-physical-expr-common"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "ahash",
+ "arrow",
+ "datafusion-common",
+ "datafusion-expr-common",
+ "hashbrown 0.14.5",
+ "rand",
+]
+
+[[package]]
+name = "datafusion-physical-optimizer"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "arrow-schema",
+ "datafusion-common",
+ "datafusion-execution",
  "datafusion-physical-expr",
+ "datafusion-physical-plan",
+ "itertools 0.13.0",
+]
+
+[[package]]
+name = "datafusion-physical-plan"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
+dependencies = [
+ "ahash",
+ "arrow",
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-ord",
+ "arrow-schema",
+ "async-trait",
+ "chrono",
+ "datafusion-common",
+ "datafusion-common-runtime",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-functions-aggregate",
+ "datafusion-functions-aggregate-common",
+ "datafusion-physical-expr",
+ "datafusion-physical-expr-common",
  "futures",
  "half",
- "hashbrown",
+ "hashbrown 0.14.5",
  "indexmap",
- "itertools 0.12.1",
+ "itertools 0.13.0",
  "log",
  "once_cell",
  "parking_lot",
  "pin-project-lite",
  "rand",
  "tokio",
- "uuid",
 ]
 
 [[package]]
 name = "datafusion-sql"
-version = "36.0.0"
-source = "git+https://github.com/harveyyue/datafusion.git?rev=d33877f8fbc7c57de946dc6081b2b357eedd0df9#d33877f8fbc7c57de946dc6081b2b357eedd0df9"
+version = "42.0.0"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=dc799de77#dc799de77659d524027faa4f82470c285d412b0d"
 dependencies = [
  "arrow",
+ "arrow-array",
  "arrow-schema",
  "datafusion-common",
  "datafusion-expr",
  "log",
+ "regex",
  "sqlparser",
+ "strum",
 ]
 
 [[package]]
@@ -1106,12 +1243,6 @@
 ]
 
 [[package]]
-name = "doc-comment"
-version = "0.3.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
-
-[[package]]
 name = "either"
 version = "1.13.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1164,9 +1295,9 @@
 
 [[package]]
 name = "flatbuffers"
-version = "23.5.26"
+version = "24.3.25"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640"
+checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f"
 dependencies = [
  "bitflags 1.3.2",
  "rustc_version",
@@ -1174,9 +1305,9 @@
 
 [[package]]
 name = "flate2"
-version = "1.0.33"
+version = "1.0.34"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253"
+checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0"
 dependencies = [
  "crc32fast",
  "miniz_oxide",
@@ -1253,7 +1384,7 @@
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
@@ -1309,9 +1440,9 @@
 
 [[package]]
 name = "gimli"
-version = "0.31.0"
+version = "0.31.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64"
+checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
 
 [[package]]
 name = "glob"
@@ -1350,10 +1481,10 @@
 ]
 
 [[package]]
-name = "heck"
-version = "0.4.1"
+name = "hashbrown"
+version = "0.15.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
+checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
 
 [[package]]
 name = "heck"
@@ -1381,9 +1512,9 @@
 
 [[package]]
 name = "iana-time-zone"
-version = "0.1.60"
+version = "0.1.61"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141"
+checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220"
 dependencies = [
  "android_system_properties",
  "core-foundation-sys",
@@ -1414,12 +1545,24 @@
 
 [[package]]
 name = "indexmap"
-version = "2.5.0"
+version = "2.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5"
+checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
 dependencies = [
  "equivalent",
- "hashbrown",
+ "hashbrown 0.15.0",
+]
+
+[[package]]
+name = "instant"
+version = "0.1.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
+dependencies = [
+ "cfg-if",
+ "js-sys",
+ "wasm-bindgen",
+ "web-sys",
 ]
 
 [[package]]
@@ -1582,9 +1725,9 @@
 
 [[package]]
 name = "libc"
-version = "0.2.158"
+version = "0.2.159"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439"
+checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5"
 
 [[package]]
 name = "libm"
@@ -1761,28 +1904,28 @@
 
 [[package]]
 name = "object"
-version = "0.36.4"
+version = "0.36.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a"
+checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e"
 dependencies = [
  "memchr",
 ]
 
 [[package]]
 name = "object_store"
-version = "0.9.1"
+version = "0.11.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b8718f8b65fdf67a45108d1548347d4af7d71fb81ce727bbf9e3b2535e079db3"
+checksum = "25a0c4b3a0e31f8b66f71ad8064521efa773910196e2cde791436f13409f3b45"
 dependencies = [
  "async-trait",
  "bytes",
  "chrono",
  "futures",
  "humantime",
- "itertools 0.12.1",
+ "itertools 0.13.0",
  "parking_lot",
  "percent-encoding",
- "snafu 0.7.5",
+ "snafu",
  "tokio",
  "tracing",
  "url",
@@ -1798,16 +1941,13 @@
 [[package]]
 name = "orc-rust"
 version = "0.3.1"
-source = "git+https://github.com/harveyyue/datafusion-orc.git?rev=f0ff4bcffa762b62e8c57ed4c2f6e1a9547b4abb#f0ff4bcffa762b62e8c57ed4c2f6e1a9547b4abb"
+source = "git+https://github.com/blaze-init/datafusion-orc.git?rev=c54bfb5#c54bfb5bed3e74a51229be54373828e739dd53e6"
 dependencies = [
  "arrow",
  "async-trait",
  "bytes",
  "chrono",
- "chrono-tz 0.9.0",
- "datafusion",
- "datafusion-expr",
- "datafusion-physical-expr",
+ "chrono-tz",
  "fallible-streaming-iterator",
  "flate2",
  "futures",
@@ -1815,9 +1955,8 @@
  "lz4_flex",
  "lzokay-native",
  "num",
- "object_store",
  "prost 0.12.6",
- "snafu 0.8.4",
+ "snafu",
  "snap",
  "tokio",
  "zstd 0.12.4",
@@ -1863,8 +2002,8 @@
 
 [[package]]
 name = "parquet"
-version = "50.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=7471d70f7ae6edd5d4da82b7d966a8ede720e499#7471d70f7ae6edd5d4da82b7d966a8ede720e499"
+version = "53.0.0"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -1874,14 +2013,14 @@
  "arrow-ipc",
  "arrow-schema",
  "arrow-select",
- "base64 0.21.7",
+ "base64",
  "brotli",
  "bytes",
  "chrono",
  "flate2",
  "futures",
  "half",
- "hashbrown",
+ "hashbrown 0.14.5",
  "lz4_flex",
  "num",
  "num-bigint",
@@ -1893,6 +2032,7 @@
  "tokio",
  "twox-hash",
  "zstd 0.13.2",
+ "zstd-sys",
 ]
 
 [[package]]
@@ -1978,9 +2118,9 @@
 
 [[package]]
 name = "pkg-config"
-version = "0.3.30"
+version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
+checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2"
 
 [[package]]
 name = "ppv-lite86"
@@ -1998,14 +2138,14 @@
 checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba"
 dependencies = [
  "proc-macro2",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
 name = "proc-macro2"
-version = "1.0.86"
+version = "1.0.87"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
+checksum = "b3e4daa0dcf6feba26f985457cdf104d4b4256fc5a09547140f3631bb076b19a"
 dependencies = [
  "unicode-ident",
 ]
@@ -2032,12 +2172,12 @@
 
 [[package]]
 name = "prost-build"
-version = "0.13.2"
+version = "0.13.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f8650aabb6c35b860610e9cff5dc1af886c9e25073b7b1712a68972af4281302"
+checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
 dependencies = [
  "bytes",
- "heck 0.5.0",
+ "heck",
  "itertools 0.13.0",
  "log",
  "multimap",
@@ -2047,7 +2187,7 @@
  "prost 0.13.3",
  "prost-types",
  "regex",
- "syn 2.0.77",
+ "syn 2.0.79",
  "tempfile",
 ]
 
@@ -2061,7 +2201,7 @@
  "itertools 0.12.1",
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
@@ -2074,14 +2214,14 @@
  "itertools 0.13.0",
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
 name = "prost-types"
-version = "0.13.2"
+version = "0.13.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "60caa6738c7369b940c3d49246a8d1749323674c65cb13010134f5c9bad5b519"
+checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670"
 dependencies = [
  "prost 0.13.3",
 ]
@@ -2159,18 +2299,18 @@
 
 [[package]]
 name = "redox_syscall"
-version = "0.5.4"
+version = "0.5.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0884ad60e090bf1345b93da0a5de8923c93884cd03f40dfcfddd3b4bee661853"
+checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f"
 dependencies = [
  "bitflags 2.6.0",
 ]
 
 [[package]]
 name = "regex"
-version = "1.10.6"
+version = "1.11.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619"
+checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8"
 dependencies = [
  "aho-corasick",
  "memchr",
@@ -2180,9 +2320,9 @@
 
 [[package]]
 name = "regex-automata"
-version = "0.4.7"
+version = "0.4.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df"
+checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3"
 dependencies = [
  "aho-corasick",
  "memchr",
@@ -2191,9 +2331,9 @@
 
 [[package]]
 name = "regex-syntax"
-version = "0.8.4"
+version = "0.8.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
+checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
 
 [[package]]
 name = "rustc-demangle"
@@ -2279,7 +2419,7 @@
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
@@ -2311,9 +2451,9 @@
 
 [[package]]
 name = "simdutf8"
-version = "0.1.4"
+version = "0.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
+checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e"
 
 [[package]]
 name = "siphasher"
@@ -2347,45 +2487,23 @@
 
 [[package]]
 name = "snafu"
-version = "0.7.5"
+version = "0.8.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6"
+checksum = "223891c85e2a29c3fe8fb900c1fae5e69c2e42415e3177752e8718475efa5019"
 dependencies = [
- "doc-comment",
- "snafu-derive 0.7.5",
-]
-
-[[package]]
-name = "snafu"
-version = "0.8.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2b835cb902660db3415a672d862905e791e54d306c6e8189168c7f3d9ae1c79d"
-dependencies = [
- "snafu-derive 0.8.4",
+ "snafu-derive",
 ]
 
 [[package]]
 name = "snafu-derive"
-version = "0.7.5"
+version = "0.8.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf"
+checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917"
 dependencies = [
- "heck 0.4.1",
+ "heck",
  "proc-macro2",
  "quote",
- "syn 1.0.109",
-]
-
-[[package]]
-name = "snafu-derive"
-version = "0.8.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5"
-dependencies = [
- "heck 0.5.0",
- "proc-macro2",
- "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
@@ -2415,9 +2533,9 @@
 
 [[package]]
 name = "sqlparser"
-version = "0.43.1"
+version = "0.50.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f95c4bae5aba7cd30bd506f7140026ade63cff5afd778af8854026f9606bf5d4"
+checksum = "b2e5b515a2bd5168426033e9efbfd05500114833916f1d5c268f938b4ee130ac"
 dependencies = [
  "log",
  "sqlparser_derive",
@@ -2431,7 +2549,7 @@
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
@@ -2455,11 +2573,11 @@
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
 dependencies = [
- "heck 0.5.0",
+ "heck",
  "proc-macro2",
  "quote",
  "rustversion",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
@@ -2481,9 +2599,9 @@
 
 [[package]]
 name = "syn"
-version = "2.0.77"
+version = "2.0.79"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed"
+checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2511,22 +2629,22 @@
 
 [[package]]
 name = "thiserror"
-version = "1.0.63"
+version = "1.0.64"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724"
+checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84"
 dependencies = [
  "thiserror-impl",
 ]
 
 [[package]]
 name = "thiserror-impl"
-version = "1.0.63"
+version = "1.0.64"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
+checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
@@ -2595,7 +2713,7 @@
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
@@ -2622,7 +2740,7 @@
  "prost-build",
  "prost-types",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
@@ -2644,7 +2762,7 @@
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
@@ -2680,9 +2798,9 @@
 
 [[package]]
 name = "unicode-bidi"
-version = "0.3.15"
+version = "0.3.17"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75"
+checksum = "5ab17db44d7388991a428b2ee655ce0c212e862eff1768a455c58f9aad6e7893"
 
 [[package]]
 name = "unicode-ident"
@@ -2692,9 +2810,9 @@
 
 [[package]]
 name = "unicode-normalization"
-version = "0.1.23"
+version = "0.1.24"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5"
+checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956"
 dependencies = [
  "tinyvec",
 ]
@@ -2707,9 +2825,9 @@
 
 [[package]]
 name = "unicode-width"
-version = "0.1.13"
+version = "0.1.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d"
+checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af"
 
 [[package]]
 name = "url"
@@ -2775,7 +2893,7 @@
  "once_cell",
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
  "wasm-bindgen-shared",
 ]
 
@@ -2797,7 +2915,7 @@
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
  "wasm-bindgen-backend",
  "wasm-bindgen-shared",
 ]
@@ -2809,6 +2927,16 @@
 checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484"
 
 [[package]]
+name = "web-sys"
+version = "0.3.70"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0"
+dependencies = [
+ "js-sys",
+ "wasm-bindgen",
+]
+
+[[package]]
 name = "winapi-util"
 version = "0.1.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2944,7 +3072,7 @@
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.77",
+ "syn 2.0.79",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 7228628..cb77161 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -51,42 +51,42 @@
 datafusion-ext-functions = { path = "./native-engine/datafusion-ext-functions" }
 datafusion-ext-plans = { path = "./native-engine/datafusion-ext-plans" }
 
-# datafusion: branch=v36-blaze
-datafusion = { version = "36.0.0" }
+# datafusion: branch=v42-blaze
+datafusion = { version = "42.0.0" }
 
 orc-rust = { version = "0.3.1" }
 
-# arrow: branch=v50-blaze
-arrow = { version = "50.0.0", features = ["ffi"]}
-arrow-schema = { version = "50.0.0", features = ["serde"] }
-parquet = { version = "50.0.0" }
+# arrow: branch=v53-blaze
+arrow = { version = "53.0.0", features = ["ffi"]}
+arrow-schema = { version = "53.0.0", features = ["serde"] }
+parquet = { version = "53.0.0" }
 
 # serde_json: branch=v1.0.96-blaze
 serde_json = { version = "1.0.96" }
 
 [patch.crates-io]
-# datafusion: branch=v36-blaze
-datafusion = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-datafusion-common = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-datafusion-expr = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-datafusion-execution = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-datafusion-optimizer = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-datafusion-physical-expr = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
-orc-rust = { git = "https://github.com/harveyyue/datafusion-orc.git", rev = "f0ff4bcffa762b62e8c57ed4c2f6e1a9547b4abb"}
+# datafusion: branch=v42-blaze
+datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
+orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "c54bfb5"}
 
-# arrow: branch=v50-blaze
-arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
-parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
+# arrow: branch=v53-blaze
+arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
 
 # serde_json: branch=v1.0.96-blaze
 serde_json = { git = "https://github.com/blaze-init/json", branch = "v1.0.96-blaze" }
diff --git a/native-engine/blaze-serde/Cargo.toml b/native-engine/blaze-serde/Cargo.toml
index 3f97153..595fecc 100644
--- a/native-engine/blaze-serde/Cargo.toml
+++ b/native-engine/blaze-serde/Cargo.toml
@@ -15,7 +15,7 @@
 datafusion-ext-functions = { workspace = true }
 datafusion-ext-plans = { workspace = true }
 log = "0.4.22"
-object_store = "0.9.0"
+object_store = "0.11.0"
 prost = "0.13.3"
 
 [build-dependencies]
diff --git a/native-engine/blaze-serde/build.rs b/native-engine/blaze-serde/build.rs
index a6d281b..44153a8 100644
--- a/native-engine/blaze-serde/build.rs
+++ b/native-engine/blaze-serde/build.rs
@@ -17,7 +17,6 @@
     println!("cargo:rerun-if-env-changed=FORCE_REBUILD");
 
     println!("cargo:rerun-if-changed=proto/blaze.proto");
-    tonic_build::configure()
-        .compile(&["proto/blaze.proto"], &["proto"])
+    tonic_build::compile_protos("proto/blaze.proto")
         .map_err(|e| format!("protobuf compilation failed: {}", e))
 }
diff --git a/native-engine/blaze-serde/proto/blaze.proto b/native-engine/blaze-serde/proto/blaze.proto
index cbf965d..a5387be 100644
--- a/native-engine/blaze-serde/proto/blaze.proto
+++ b/native-engine/blaze-serde/proto/blaze.proto
@@ -215,7 +215,7 @@
   Sqrt=17;
   Tan=18;
   Trunc=19;
-  Array=20;
+  NullIf=20;
   RegexpMatch=21;
   BitLength=22;
   Btrim=23;
diff --git a/native-engine/blaze-serde/src/from_proto.rs b/native-engine/blaze-serde/src/from_proto.rs
index 6b5f7c9..7ab4677 100644
--- a/native-engine/blaze-serde/src/from_proto.rs
+++ b/native-engine/blaze-serde/src/from_proto.rs
@@ -33,11 +33,10 @@
         physical_plan::FileScanConfig,
     },
     error::DataFusionError,
-    execution::context::ExecutionProps,
-    logical_expr::{BuiltinScalarFunction, ColumnarValue, Operator},
+    logical_expr::{ColumnarValue, Operator, ScalarUDF, Volatility},
     physical_expr::{
         expressions::{in_list, LikeExpr, SCAndExpr, SCOrExpr},
-        functions, ScalarFunctionExpr,
+        ScalarFunctionExpr,
     },
     physical_plan::{
         expressions as phys_expr,
@@ -48,6 +47,7 @@
         union::UnionExec,
         ColumnStatistics, ExecutionPlan, Partitioning, PhysicalExpr, Statistics,
     },
+    prelude::create_udf,
 };
 use datafusion_ext_commons::downcast_any;
 use datafusion_ext_exprs::{
@@ -116,7 +116,7 @@
         let new_children = expr_in
             .children()
             .iter()
-            .map(|child_expr| bind(child_expr.clone(), input_schema))
+            .map(|&child_expr| bind(child_expr.clone(), input_schema))
             .collect::<Result<Vec<_>, DataFusionError>>()?;
         Ok(expr_in.with_new_children(new_children)?)
     }
@@ -804,74 +804,75 @@
     }
 }
 
-impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
-    fn from(f: &protobuf::ScalarFunction) -> BuiltinScalarFunction {
+impl From<protobuf::ScalarFunction> for Arc<ScalarUDF> {
+    fn from(f: protobuf::ScalarFunction) -> Self {
+        use datafusion::functions as f;
         use protobuf::ScalarFunction;
+
         match f {
-            ScalarFunction::Sqrt => Self::Sqrt,
-            ScalarFunction::Sin => Self::Sin,
-            ScalarFunction::Cos => Self::Cos,
-            ScalarFunction::Tan => Self::Tan,
-            ScalarFunction::Asin => Self::Asin,
-            ScalarFunction::Acos => Self::Acos,
-            ScalarFunction::Atan => Self::Atan,
-            ScalarFunction::Exp => Self::Exp,
-            ScalarFunction::Log => Self::Log,
-            ScalarFunction::Ln => Self::Ln,
-            ScalarFunction::Log10 => Self::Log10,
-            ScalarFunction::Floor => Self::Floor,
-            ScalarFunction::Ceil => Self::Ceil,
-            ScalarFunction::Round => Self::Round,
-            ScalarFunction::Trunc => Self::Trunc,
-            ScalarFunction::Abs => Self::Abs,
-            ScalarFunction::OctetLength => Self::OctetLength,
-            ScalarFunction::Concat => Self::Concat,
-            ScalarFunction::Lower => Self::Lower,
-            ScalarFunction::Upper => Self::Upper,
-            ScalarFunction::Trim => Self::Trim,
-            ScalarFunction::Ltrim => Self::Ltrim,
-            ScalarFunction::Rtrim => Self::Rtrim,
-            ScalarFunction::ToTimestamp => Self::ToTimestamp,
-            ScalarFunction::Array => Self::MakeArray,
-            // ScalarFunction::NullIf => todo!(),
-            ScalarFunction::DatePart => Self::DatePart,
-            ScalarFunction::DateTrunc => Self::DateTrunc,
-            ScalarFunction::Md5 => Self::MD5,
-            ScalarFunction::Sha224 => Self::SHA224,
-            ScalarFunction::Sha256 => Self::SHA256,
-            ScalarFunction::Sha384 => Self::SHA384,
-            ScalarFunction::Sha512 => Self::SHA512,
-            ScalarFunction::Digest => Self::Digest,
-            ScalarFunction::ToTimestampMillis => Self::ToTimestampMillis,
-            ScalarFunction::Log2 => Self::Log2,
-            ScalarFunction::Signum => Self::Signum,
-            ScalarFunction::Ascii => Self::Ascii,
-            ScalarFunction::BitLength => Self::BitLength,
-            ScalarFunction::Btrim => Self::Btrim,
-            ScalarFunction::CharacterLength => Self::CharacterLength,
-            ScalarFunction::Chr => Self::Chr,
-            ScalarFunction::ConcatWithSeparator => Self::ConcatWithSeparator,
-            ScalarFunction::InitCap => Self::InitCap,
-            ScalarFunction::Left => Self::Left,
-            ScalarFunction::Lpad => Self::Lpad,
-            ScalarFunction::Random => Self::Random,
-            ScalarFunction::RegexpReplace => Self::RegexpReplace,
-            ScalarFunction::Repeat => Self::Repeat,
-            ScalarFunction::Replace => Self::Replace,
-            ScalarFunction::Reverse => Self::Reverse,
-            ScalarFunction::Right => Self::Right,
-            ScalarFunction::Rpad => Self::Rpad,
-            ScalarFunction::SplitPart => Self::SplitPart,
-            ScalarFunction::StartsWith => Self::StartsWith,
-            ScalarFunction::Strpos => Self::Strpos,
-            ScalarFunction::Substr => Self::Substr,
-            ScalarFunction::ToHex => Self::ToHex,
-            ScalarFunction::ToTimestampMicros => Self::ToTimestampMicros,
-            ScalarFunction::ToTimestampSeconds => Self::ToTimestampSeconds,
-            ScalarFunction::Now => Self::Now,
-            ScalarFunction::Translate => Self::Translate,
-            ScalarFunction::RegexpMatch => Self::RegexpMatch,
-            ScalarFunction::Coalesce => Self::Coalesce,
+            ScalarFunction::Sqrt => f::math::sqrt(),
+            ScalarFunction::Sin => f::math::sin(),
+            ScalarFunction::Cos => f::math::cos(),
+            ScalarFunction::Tan => f::math::tan(),
+            ScalarFunction::Asin => f::math::asin(),
+            ScalarFunction::Acos => f::math::acos(),
+            ScalarFunction::Atan => f::math::atan(),
+            ScalarFunction::Exp => f::math::exp(),
+            ScalarFunction::Log => f::math::log(),
+            ScalarFunction::Ln => f::math::ln(),
+            ScalarFunction::Log10 => f::math::log10(),
+            ScalarFunction::Floor => f::math::floor(),
+            ScalarFunction::Ceil => f::math::ceil(),
+            ScalarFunction::Round => f::math::round(),
+            ScalarFunction::Trunc => f::math::trunc(),
+            ScalarFunction::Abs => f::math::abs(),
+            ScalarFunction::OctetLength => f::string::octet_length(),
+            ScalarFunction::Concat => f::string::concat(),
+            ScalarFunction::Lower => f::string::lower(),
+            ScalarFunction::Upper => f::string::upper(),
+            ScalarFunction::Trim => f::string::btrim(),
+            ScalarFunction::Ltrim => f::string::ltrim(),
+            ScalarFunction::Rtrim => f::string::rtrim(),
+            ScalarFunction::ToTimestamp => f::datetime::to_timestamp(),
+            ScalarFunction::NullIf => f::core::nullif(),
+            ScalarFunction::DatePart => f::datetime::date_part(),
+            ScalarFunction::DateTrunc => f::datetime::date_trunc(),
+            ScalarFunction::Md5 => f::crypto::md5(),
+            ScalarFunction::Sha224 => f::crypto::sha224(),
+            ScalarFunction::Sha256 => f::crypto::sha256(),
+            ScalarFunction::Sha384 => f::crypto::sha384(),
+            ScalarFunction::Sha512 => f::crypto::sha512(),
+            ScalarFunction::Digest => f::crypto::digest(),
+            ScalarFunction::ToTimestampMillis => f::datetime::to_timestamp_millis(),
+            ScalarFunction::Log2 => f::math::log2(),
+            ScalarFunction::Signum => f::math::signum(),
+            ScalarFunction::Ascii => f::string::ascii(),
+            ScalarFunction::BitLength => f::string::bit_length(),
+            ScalarFunction::Btrim => f::string::btrim(),
+            ScalarFunction::CharacterLength => f::unicode::character_length(),
+            ScalarFunction::Chr => f::string::chr(),
+            ScalarFunction::ConcatWithSeparator => f::string::concat_ws(),
+            ScalarFunction::InitCap => f::string::initcap(),
+            ScalarFunction::Left => f::unicode::left(),
+            ScalarFunction::Lpad => f::unicode::lpad(),
+            ScalarFunction::Random => f::math::random(),
+            ScalarFunction::RegexpReplace => f::regex::regexp_replace(),
+            ScalarFunction::Repeat => f::string::repeat(),
+            ScalarFunction::Replace => f::string::replace(),
+            ScalarFunction::Reverse => f::unicode::reverse(),
+            ScalarFunction::Right => f::unicode::right(),
+            ScalarFunction::Rpad => f::unicode::rpad(),
+            ScalarFunction::SplitPart => f::string::split_part(),
+            ScalarFunction::StartsWith => f::string::starts_with(),
+            ScalarFunction::Strpos => f::unicode::strpos(),
+            ScalarFunction::Substr => f::unicode::substr(),
+            ScalarFunction::ToHex => f::string::to_hex(),
+            ScalarFunction::ToTimestampMicros => f::datetime::to_timestamp_micros(),
+            ScalarFunction::ToTimestampSeconds => f::datetime::to_timestamp_seconds(),
+            ScalarFunction::Now => f::datetime::now(),
+            ScalarFunction::Translate => f::unicode::translate(),
+            ScalarFunction::RegexpMatch => f::regex::regexp_match(),
+            ScalarFunction::Coalesce => f::core::coalesce(),
             ScalarFunction::SparkExtFunctions => {
                 unreachable!()
             }
@@ -998,20 +999,26 @@
                     .map(|x| try_parse_physical_expr(x, input_schema))
                     .collect::<Result<Vec<_>, _>>()?;
 
-                let execution_props = ExecutionProps::new();
-                let fun_expr = if scalar_function == protobuf::ScalarFunction::SparkExtFunctions {
-                    datafusion_ext_functions::create_spark_ext_function(&e.name)?
+                let scalar_udf = if scalar_function == protobuf::ScalarFunction::SparkExtFunctions {
+                    let fun = datafusion_ext_functions::create_spark_ext_function(&e.name)?;
+                    Arc::new(create_udf(
+                        "spark_ext_function",
+                        args.iter()
+                            .map(|e| e.data_type(input_schema))
+                            .collect::<Result<Vec<_>, _>>()?,
+                        Arc::new(convert_required!(e.return_type)?),
+                        Volatility::Volatile,
+                        fun,
+                    ))
                 } else {
-                    functions::create_physical_fun(&(&scalar_function).into(), &execution_props)?
+                    let scalar_udf: Arc<ScalarUDF> = scalar_function.into();
+                    scalar_udf
                 };
-
                 Arc::new(ScalarFunctionExpr::new(
-                    &e.name,
-                    fun_expr,
+                    scalar_udf.name(),
+                    scalar_udf.clone(),
                     args,
                     convert_required!(e.return_type)?,
-                    None,
-                    false,
                 ))
             }
             ExprType::SparkUdfWrapperExpr(e) => Arc::new(SparkUDFWrapperExpr::try_new(
@@ -1153,6 +1160,7 @@
                 .map(|v| v.try_into())
                 .collect::<Result<Vec<_>, _>>()?,
             range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
+            statistics: None,
             extensions: None,
         })
     }
diff --git a/native-engine/blaze-serde/src/lib.rs b/native-engine/blaze-serde/src/lib.rs
index 0b0b8b3..223496b 100644
--- a/native-engine/blaze-serde/src/lib.rs
+++ b/native-engine/blaze-serde/src/lib.rs
@@ -498,7 +498,11 @@
                     .map(|val| val.try_into())
                     .collect::<Result<Vec<_>, _>>()?;
                 let scalar_type: DataType = pb_scalar_type.try_into()?;
-                ScalarValue::List(ScalarValue::new_list(&typechecked_values, &scalar_type))
+                ScalarValue::List(ScalarValue::new_list(
+                    &typechecked_values,
+                    &scalar_type,
+                    true,
+                ))
             }
             protobuf::scalar_value::Value::NullValue(v) => {
                 match v.datatype.as_ref().expect("missing scalar data type") {
@@ -633,6 +637,7 @@
         Ok(ScalarValue::List(ScalarValue::new_list(
             &values,
             &element_scalar_type,
+            true,
         )))
     }
 }
diff --git a/native-engine/blaze/src/metrics.rs b/native-engine/blaze/src/metrics.rs
index ecb47fb..51665b3 100644
--- a/native-engine/blaze/src/metrics.rs
+++ b/native-engine/blaze/src/metrics.rs
@@ -39,7 +39,7 @@
     )?;
 
     // update children nodes
-    for (i, child_plan) in execution_plan.children().iter().enumerate() {
+    for (i, &child_plan) in execution_plan.children().iter().enumerate() {
         let child_metric_node = jni_call!(
             SparkMetricNode(metric_node).getChild(i as i32) -> JObject
         )?;
diff --git a/native-engine/datafusion-ext-commons/src/io/batch_serde.rs b/native-engine/datafusion-ext-commons/src/io/batch_serde.rs
index d10ddb6..df46dfc 100644
--- a/native-engine/datafusion-ext-commons/src/io/batch_serde.rs
+++ b/native-engine/datafusion-ext-commons/src/io/batch_serde.rs
@@ -162,14 +162,13 @@
 
 fn read_bits_buffer<R: Read>(input: &mut R, bits_len: usize) -> Result<Buffer> {
     let buf = read_bytes_slice(input, (bits_len + 7) / 8)?;
-    Ok(Buffer::from(buf))
+    Ok(Buffer::from_vec(buf.into()))
 }
 
 fn write_primitive_array<W: Write, PT: ArrowPrimitiveType>(
     array: &PrimitiveArray<PT>,
     output: &mut W,
 ) -> Result<()> {
-    let _item_size = PT::get_byte_width();
     let offset = array.offset();
     let len = array.len();
     let array_data = array.to_data();
@@ -510,7 +509,7 @@
     let offsets_buffer: Buffer = offsets_buffer.into();
 
     let data_len = cur_offset as usize;
-    let data_buffer = Buffer::from(read_bytes_slice(input, data_len)?);
+    let data_buffer = Buffer::from_vec(read_bytes_slice(input, data_len)?.into());
     let array_data = ArrayData::try_new(
         data_type,
         num_rows,
diff --git a/native-engine/datafusion-ext-commons/src/io/scalar_serde.rs b/native-engine/datafusion-ext-commons/src/io/scalar_serde.rs
index 344c785..c634644 100644
--- a/native-engine/datafusion-ext-commons/src/io/scalar_serde.rs
+++ b/native-engine/datafusion-ext-commons/src/io/scalar_serde.rs
@@ -90,8 +90,8 @@
                 write_array(col, output)?;
             }
         }
-        ScalarValue::Map(value, _bool) => {
-            write_scalar(value, nullable, output)?;
+        ScalarValue::Map(v) => {
+            write_array(v.as_ref(), output)?;
         }
         other => df_unimplemented_err!("unsupported scalarValue type: {other}")?,
     }
@@ -186,9 +186,9 @@
                 .collect::<Result<Vec<_>>>()?;
             ScalarValue::Struct(Arc::new(StructArray::new(fields.clone(), columns, None)))
         }
-        DataType::Map(field, bool) => {
-            let map_value = read_scalar(input, field.data_type(), field.is_nullable())?;
-            ScalarValue::Map(Box::new(map_value), *bool)
+        DataType::Map(field, _bool) => {
+            let map = read_array(input, field.data_type(), 1)?.as_map().clone();
+            ScalarValue::Map(Arc::new(map))
         }
         other => df_unimplemented_err!("unsupported data type: {other}")?,
     })
diff --git a/native-engine/datafusion-ext-commons/src/spark_hash.rs b/native-engine/datafusion-ext-commons/src/spark_hash.rs
index b095a89..a02c43b 100644
--- a/native-engine/datafusion-ext-commons/src/spark_hash.rs
+++ b/native-engine/datafusion-ext-commons/src/spark_hash.rs
@@ -547,12 +547,14 @@
         // Construct key and values
         let key_data = ArrayData::builder(DataType::Int32)
             .len(8)
-            .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
+            .add_buffer(Buffer::from_slice_ref(
+                &[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice(),
+            ))
             .build()
             .unwrap();
         let value_data = ArrayData::builder(DataType::UInt32)
             .len(8)
-            .add_buffer(Buffer::from(
+            .add_buffer(Buffer::from_slice_ref(
                 &[0u32, 10, 20, 0, 40, 0, 60, 70].to_byte_slice(),
             ))
             .null_bit_buffer(Some(Buffer::from(&[0b11010110])))
@@ -561,7 +563,7 @@
 
         // Construct a buffer for value offsets, for the nested array:
         //  [[0, 1, 2], [3, 4, 5], [6, 7]]
-        let entry_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice());
+        let entry_offsets = Buffer::from_slice_ref(&[0, 3, 6, 8].to_byte_slice());
 
         let keys_field = Arc::new(Field::new("keys", DataType::Int32, false));
         let values_field = Arc::new(Field::new("values", DataType::UInt32, true));
diff --git a/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs b/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs
index fc03745..d1c5910 100644
--- a/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs
+++ b/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs
@@ -19,12 +19,14 @@
 };
 
 use arrow::{
-    array::{make_array, new_empty_array, Array, ArrayRef, AsArray, Capacities, MutableArrayData},
+    array::{
+        make_array, new_empty_array, Array, ArrayRef, AsArray, Capacities, MutableArrayData,
+        RecordBatch, RecordBatchOptions,
+    },
     datatypes::{
         ArrowNativeType, BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, SchemaRef,
         Utf8Type,
     },
-    record_batch::{RecordBatch, RecordBatchOptions},
 };
 use arrow_schema::DataType;
 use datafusion::{
diff --git a/native-engine/datafusion-ext-exprs/src/bloom_filter_might_contain.rs b/native-engine/datafusion-ext-exprs/src/bloom_filter_might_contain.rs
index 32878a3..5b9bb49 100644
--- a/native-engine/datafusion-ext-exprs/src/bloom_filter_might_contain.rs
+++ b/native-engine/datafusion-ext-exprs/src/bloom_filter_might_contain.rs
@@ -123,8 +123,8 @@
         Ok(ColumnarValue::Array(Arc::new(might_contain)))
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.bloom_filter_expr.clone(), self.value_expr.clone()]
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        vec![&self.bloom_filter_expr, &self.value_expr]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/cast.rs b/native-engine/datafusion-ext-exprs/src/cast.rs
index 78a1e0f..b3cd640 100644
--- a/native-engine/datafusion-ext-exprs/src/cast.rs
+++ b/native-engine/datafusion-ext-exprs/src/cast.rs
@@ -82,8 +82,8 @@
         })
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        vec![&self.expr]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/get_indexed_field.rs b/native-engine/datafusion-ext-exprs/src/get_indexed_field.rs
index 6fd6620..0a94ec5 100644
--- a/native-engine/datafusion-ext-exprs/src/get_indexed_field.rs
+++ b/native-engine/datafusion-ext-exprs/src/get_indexed_field.rs
@@ -133,8 +133,8 @@
         }
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.arg.clone()]
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        vec![&self.arg]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/get_map_value.rs b/native-engine/datafusion-ext-exprs/src/get_map_value.rs
index 387eed4..718fedb 100644
--- a/native-engine/datafusion-ext-exprs/src/get_map_value.rs
+++ b/native-engine/datafusion-ext-exprs/src/get_map_value.rs
@@ -21,6 +21,7 @@
 
 use arrow::{
     array::*,
+    compute::SortOptions,
     datatypes::{DataType, Field, Schema},
     record_batch::RecordBatch,
 };
@@ -95,7 +96,8 @@
                 let as_map_array = array.as_map();
                 let value_data = as_map_array.values().to_data();
                 let key = self.key.to_array()?;
-                let comparator = build_compare(as_map_array.keys(), &key)?;
+                let comparator =
+                    make_comparator(as_map_array.keys(), &key, SortOptions::default())?;
                 let mut mutable =
                     MutableArrayData::new(vec![&value_data], true, as_map_array.len());
 
@@ -125,8 +127,8 @@
         }
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.arg.clone()]
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        vec![&self.arg]
     }
 
     fn with_new_children(
@@ -178,19 +180,21 @@
         // Construct key and values
         let key_data = ArrayData::builder(DataType::Int32)
             .len(8)
-            .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
+            .add_buffer(Buffer::from_slice_ref(
+                &[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice(),
+            ))
             .build()
             .unwrap();
         let value_data = ArrayData::builder(DataType::UInt32)
             .len(8)
-            .add_buffer(Buffer::from(
+            .add_buffer(Buffer::from_slice_ref(
                 &[0u32, 10, 20, 0, 40, 0, 60, 70].to_byte_slice(),
             ))
-            .null_bit_buffer(Some(Buffer::from(&[0b11010110])))
+            .null_bit_buffer(Some(Buffer::from_slice_ref(&[0b11010110])))
             .build()
             .unwrap();
 
-        let entry_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice());
+        let entry_offsets = Buffer::from_slice_ref(&[0, 3, 6, 8].to_byte_slice());
 
         let keys_field = Arc::new(Field::new("keys", DataType::Int32, false));
         let values_field = Arc::new(Field::new("values", DataType::UInt32, true));
diff --git a/native-engine/datafusion-ext-exprs/src/named_struct.rs b/native-engine/datafusion-ext-exprs/src/named_struct.rs
index 0093869..a217731 100644
--- a/native-engine/datafusion-ext-exprs/src/named_struct.rs
+++ b/native-engine/datafusion-ext-exprs/src/named_struct.rs
@@ -105,8 +105,8 @@
         Ok(ColumnarValue::Array(named_struct))
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        self.values.clone()
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        self.values.iter().collect()
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/row_num.rs b/native-engine/datafusion-ext-exprs/src/row_num.rs
index 4def27f..d281348 100644
--- a/native-engine/datafusion-ext-exprs/src/row_num.rs
+++ b/native-engine/datafusion-ext-exprs/src/row_num.rs
@@ -71,7 +71,7 @@
         Ok(ColumnarValue::Array(Arc::new(array)))
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
         vec![]
     }
 
diff --git a/native-engine/datafusion-ext-exprs/src/spark_scalar_subquery_wrapper.rs b/native-engine/datafusion-ext-exprs/src/spark_scalar_subquery_wrapper.rs
index 1da0e76..20dd9c6 100644
--- a/native-engine/datafusion-ext-exprs/src/spark_scalar_subquery_wrapper.rs
+++ b/native-engine/datafusion-ext-exprs/src/spark_scalar_subquery_wrapper.rs
@@ -113,7 +113,7 @@
         result.cloned()
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
         vec![]
     }
 
diff --git a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
index 688101d..98c5334 100644
--- a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
+++ b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
@@ -197,8 +197,8 @@
         Ok(ColumnarValue::Array(imported_array))
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        self.params.clone()
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        self.params.iter().collect()
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/string_contains.rs b/native-engine/datafusion-ext-exprs/src/string_contains.rs
index 023863c..770708d 100644
--- a/native-engine/datafusion-ext-exprs/src/string_contains.rs
+++ b/native-engine/datafusion-ext-exprs/src/string_contains.rs
@@ -101,8 +101,8 @@
         }
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        vec![&self.expr]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/string_ends_with.rs b/native-engine/datafusion-ext-exprs/src/string_ends_with.rs
index 514fd7e..34f0b42 100644
--- a/native-engine/datafusion-ext-exprs/src/string_ends_with.rs
+++ b/native-engine/datafusion-ext-exprs/src/string_ends_with.rs
@@ -100,8 +100,8 @@
         }
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        vec![&self.expr]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-exprs/src/string_starts_with.rs b/native-engine/datafusion-ext-exprs/src/string_starts_with.rs
index 4bb8d96..672cafc 100644
--- a/native-engine/datafusion-ext-exprs/src/string_starts_with.rs
+++ b/native-engine/datafusion-ext-exprs/src/string_starts_with.rs
@@ -100,8 +100,8 @@
         }
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        vec![&self.expr]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-functions/src/spark_dates.rs b/native-engine/datafusion-ext-functions/src/spark_dates.rs
index 7922339..265255b 100644
--- a/native-engine/datafusion-ext-functions/src/spark_dates.rs
+++ b/native-engine/datafusion-ext-functions/src/spark_dates.rs
@@ -12,22 +12,22 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use arrow::compute::{day_dyn, month_dyn, year_dyn};
+use arrow::compute::{date_part, DatePart};
 use datafusion::{common::Result, physical_plan::ColumnarValue};
 
 pub fn spark_year(args: &[ColumnarValue]) -> Result<ColumnarValue> {
     let input = args[0].clone().into_array(1)?;
-    Ok(ColumnarValue::Array(year_dyn(&input)?))
+    Ok(ColumnarValue::Array(date_part(&input, DatePart::Year)?))
 }
 
 pub fn spark_month(args: &[ColumnarValue]) -> Result<ColumnarValue> {
     let input = args[0].clone().into_array(1)?;
-    Ok(ColumnarValue::Array(month_dyn(&input)?))
+    Ok(ColumnarValue::Array(date_part(&input, DatePart::Month)?))
 }
 
 pub fn spark_day(args: &[ColumnarValue]) -> Result<ColumnarValue> {
     let input = args[0].clone().into_array(1)?;
-    Ok(ColumnarValue::Array(day_dyn(&input)?))
+    Ok(ColumnarValue::Array(date_part(&input, DatePart::Day)?))
 }
 
 #[cfg(test)]
diff --git a/native-engine/datafusion-ext-functions/src/spark_make_array.rs b/native-engine/datafusion-ext-functions/src/spark_make_array.rs
index 4d2159e..a86569c 100644
--- a/native-engine/datafusion-ext-functions/src/spark_make_array.rs
+++ b/native-engine/datafusion-ext-functions/src/spark_make_array.rs
@@ -118,6 +118,7 @@
                 output_scalars.push(ScalarValue::List(ScalarValue::new_list(
                     &row_scalars,
                     data_type,
+                    true,
                 )));
             }
             ScalarValue::iter_to_array(output_scalars)?
diff --git a/native-engine/datafusion-ext-plans/Cargo.toml b/native-engine/datafusion-ext-plans/Cargo.toml
index 7b35284..f28da14 100644
--- a/native-engine/datafusion-ext-plans/Cargo.toml
+++ b/native-engine/datafusion-ext-plans/Cargo.toml
@@ -30,7 +30,7 @@
 log = "0.4.22"
 lz4_flex = "0.11.2"
 num = "0.4.2"
-object_store = "0.9.0"
+object_store = "0.11.0"
 once_cell = "1.20.2"
 panic-message = "0.3.0"
 parking_lot = "0.12.3"
@@ -44,3 +44,6 @@
 zstd = "0.13.2"
 orc-rust = { workspace = true }
 futures-util = "0.3.31"
+
+[dev-dependencies]
+rand = "0.8.5"
diff --git a/native-engine/datafusion-ext-plans/src/agg/collect_list.rs b/native-engine/datafusion-ext-plans/src/agg/collect_list.rs
index c6cc796..7c1cba4 100644
--- a/native-engine/datafusion-ext-plans/src/agg/collect_list.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/collect_list.rs
@@ -203,6 +203,7 @@
                         .into_values(self.arg_type.clone(), false)
                         .collect::<Vec<_>>(),
                     &self.arg_type,
+                    true,
                 )))
             }
             None => ScalarValue::try_from(&self.data_type),
diff --git a/native-engine/datafusion-ext-plans/src/agg/collect_set.rs b/native-engine/datafusion-ext-plans/src/agg/collect_set.rs
index a145ad4..497eb47 100644
--- a/native-engine/datafusion-ext-plans/src/agg/collect_set.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/collect_set.rs
@@ -200,6 +200,7 @@
                 Ok(ScalarValue::List(ScalarValue::new_list(
                     &set.into_iter().collect::<Vec<_>>(),
                     &self.arg_type,
+                    true,
                 )))
             }
             None => ScalarValue::try_from(&self.data_type),
diff --git a/native-engine/datafusion-ext-plans/src/agg/mod.rs b/native-engine/datafusion-ext-plans/src/agg/mod.rs
index 36cdd22..d092cda 100644
--- a/native-engine/datafusion-ext-plans/src/agg/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/mod.rs
@@ -38,7 +38,7 @@
 use arrow::{array::*, datatypes::*};
 use datafusion::{
     common::{Result, ScalarValue},
-    logical_expr::aggregate_function,
+    // logical_expr::aggregate_function,
     physical_expr::PhysicalExpr,
 };
 use datafusion_ext_commons::df_execution_err;
@@ -196,6 +196,8 @@
     children: &[Arc<dyn PhysicalExpr>],
     input_schema: &SchemaRef,
 ) -> Result<Arc<dyn Agg>> {
+    use datafusion::logical_expr::type_coercion::aggregates::*;
+
     Ok(match agg_function {
         AggFunction::Count => {
             let return_type = DataType::Int64;
@@ -203,10 +205,12 @@
         }
         AggFunction::Sum => {
             let arg_type = children[0].data_type(input_schema)?;
-            let return_type = aggregate_function::AggregateFunction::return_type(
-                &aggregate_function::AggregateFunction::Sum,
-                &[arg_type],
-            )?;
+            let return_type = match arg_type {
+                DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
+                    DataType::Int64
+                }
+                other => sum_return_type(&other)?,
+            };
             Arc::new(sum::AggSum::try_new(
                 Arc::new(TryCastExpr::new(children[0].clone(), return_type.clone())),
                 return_type,
@@ -214,10 +218,7 @@
         }
         AggFunction::Avg => {
             let arg_type = children[0].data_type(input_schema)?;
-            let return_type = aggregate_function::AggregateFunction::return_type(
-                &aggregate_function::AggregateFunction::Avg,
-                &[arg_type],
-            )?;
+            let return_type = avg_return_type("avg", &arg_type)?;
             Arc::new(avg::AggAvg::try_new(
                 Arc::new(TryCastExpr::new(children[0].clone(), return_type.clone())),
                 return_type,
diff --git a/native-engine/datafusion-ext-plans/src/agg_exec.rs b/native-engine/datafusion-ext-plans/src/agg_exec.rs
index 2b4c1e0..2e1d2dd 100644
--- a/native-engine/datafusion-ext-plans/src/agg_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/agg_exec.rs
@@ -26,17 +26,19 @@
 use datafusion::{
     common::{Result, Statistics},
     execution::context::TaskContext,
-    physical_expr::PhysicalSortExpr,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, SendableRecordBatchStream,
     },
 };
 use datafusion_ext_commons::{
     batch_size, slim_bytes::SlimBytes, streams::coalesce_stream::CoalesceInput,
 };
 use futures::{stream::once, StreamExt, TryFutureExt, TryStreamExt};
+use once_cell::sync::OnceCell;
 
 use crate::{
     agg::{
@@ -58,6 +60,7 @@
     input: Arc<dyn ExecutionPlan>,
     agg_ctx: Arc<AggContext>,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl AggExec {
@@ -82,11 +85,16 @@
             input,
             agg_ctx,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         })
     }
 }
 
 impl ExecutionPlan for AggExec {
+    fn name(&self) -> &str {
+        "AggExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -95,16 +103,18 @@
         self.agg_ctx.output_schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
@@ -115,6 +125,7 @@
             input: children[0].clone(),
             agg_ctx: self.agg_ctx.clone(),
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         }))
     }
 
diff --git a/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs b/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs
index bcfdcf2..aed0c01 100644
--- a/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs
@@ -22,14 +22,16 @@
 use datafusion::{
     common::Result,
     execution::{SendableRecordBatchStream, TaskContext},
-    physical_expr::{Partitioning, PhysicalExpr, PhysicalSortExpr},
+    physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr},
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties,
     },
 };
 use futures::{stream::once, StreamExt, TryStreamExt};
+use once_cell::sync::OnceCell;
 
 use crate::{
     common::{output::TaskOutputter, timer_helper::TimerHelper},
@@ -40,6 +42,7 @@
     input: Arc<dyn ExecutionPlan>,
     keys: Vec<Arc<dyn PhysicalExpr>>,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl BroadcastJoinBuildHashMapExec {
@@ -48,6 +51,7 @@
             input,
             keys,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         }
     }
 }
@@ -65,6 +69,10 @@
 }
 
 impl ExecutionPlan for BroadcastJoinBuildHashMapExec {
+    fn name(&self) -> &str {
+        "BroadcastJoinBuildHashMapExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -73,16 +81,20 @@
         join_hash_map_schema(&self.input.schema())
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(self.input.output_partitioning().partition_count())
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                Partitioning::UnknownPartitioning(
+                    self.input.output_partitioning().partition_count(),
+                ),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs b/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs
index 3da0107..e27d7f7 100644
--- a/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs
@@ -29,12 +29,13 @@
 use datafusion::{
     common::{DataFusionError, JoinSide, Result, Statistics},
     execution::context::TaskContext,
-    physical_expr::{PhysicalExprRef, PhysicalSortExpr},
+    physical_expr::{EquivalenceProperties, PhysicalExprRef},
     physical_plan::{
         joins::utils::JoinOn,
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, Time},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, SendableRecordBatchStream,
     },
 };
 use datafusion_ext_commons::{
@@ -82,6 +83,7 @@
     is_built: bool, // true for BroadcastHashJoin, false for ShuffledHashJoin
     cached_build_hash_map_id: Option<String>,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl BroadcastJoinExec {
@@ -105,6 +107,7 @@
             is_built,
             cached_build_hash_map_id,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         })
     }
 
@@ -237,6 +240,10 @@
 }
 
 impl ExecutionPlan for BroadcastJoinExec {
+    fn name(&self) -> &str {
+        "BroadcastJoin"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -245,19 +252,21 @@
         self.schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        match self.broadcast_side {
-            JoinSide::Left => self.right.output_partitioning(),
-            JoinSide::Right => self.left.output_partitioning(),
-        }
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                match self.broadcast_side {
+                    JoinSide::Left => self.right.output_partitioning().clone(),
+                    JoinSide::Right => self.left.output_partitioning().clone(),
+                },
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.left.clone(), self.right.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.left, &self.right]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs b/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs
index 9792057..1fa6b99 100644
--- a/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs
+++ b/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs
@@ -36,8 +36,9 @@
     },
     physical_expr::{
         expressions::{CaseExpr, Column, Literal, NoOp, SCAndExpr, SCOrExpr},
-        scatter, PhysicalExpr, PhysicalExprRef,
+        PhysicalExpr, PhysicalExprRef,
     },
+    physical_expr_common::utils::scatter,
     physical_plan::ColumnarValue,
 };
 use datafusion_ext_commons::{cast::cast, uda::UserDefinedArray};
@@ -208,8 +209,14 @@
             // short circuiting expression - only first child can be cached
             // first `when` expr can also be cached
             collect_dups(&expr.children()[0], current_count, expr_counts, dups);
-            if expr.as_any().downcast_ref::<CaseExpr>().is_some() {
-                collect_dups(&expr.children()[1], current_count, expr_counts, dups);
+            if let Some(case_expr) = expr.as_any().downcast_ref::<CaseExpr>() {
+                if case_expr.expr().is_some() {
+                    let children = case_expr.children();
+                    if children.len() >= 2 {
+                        // cache first `when` expr
+                        collect_dups(&expr.children()[1], current_count, expr_counts, dups);
+                    }
+                }
             }
         } else {
             expr.children().iter().for_each(|child| {
@@ -254,17 +261,25 @@
         {
             // short circuiting expression - only first child can be cached
             // first `when` expr can also be cached
-            let mut children = expr.children();
+            let mut children = expr
+                .children()
+                .iter()
+                .map(|&child| child.clone())
+                .collect::<Vec<_>>();
             children[0] = transform(children[0].clone(), cached_expr_ids, cache)?;
-            if expr.as_any().downcast_ref::<CaseExpr>().is_some() && children.len() >= 2 {
-                children[1] = transform(children[1].clone(), cached_expr_ids, cache)?;
+
+            if let Some(case_expr) = expr.as_any().downcast_ref::<CaseExpr>() {
+                if children.len() >= 2 && case_expr.expr().is_some() {
+                    // cache first `when` expr
+                    children[1] = transform(children[1].clone(), cached_expr_ids, cache)?;
+                }
             }
             expr.clone().with_new_children(children)?
         } else {
             expr.clone().with_new_children(
                 expr.children()
                     .into_iter()
-                    .map(|child| transform(child, cached_expr_ids, cache))
+                    .map(|child| transform(child.clone(), cached_expr_ids, cache))
                     .collect::<Result<_>>()?,
             )?
         };
@@ -349,7 +364,7 @@
         self.cache.get(self.id, || self.orig_expr.evaluate(batch))
     }
 
-    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
         self.orig_expr.children()
     }
 
@@ -441,12 +456,13 @@
 
                 let mapped_idx = *used_cols_ref.entry(col.index()).or_insert(new_idx);
                 let mapped_col: PhysicalExprRef = Arc::new(Column::new(col.name(), mapped_idx));
-                Ok(Transformed::Yes(mapped_col))
+                Ok(Transformed::yes(mapped_col))
             } else {
-                Ok(Transformed::Yes(expr))
+                Ok(Transformed::yes(expr))
             }
         })
-        .unwrap();
+        .unwrap()
+        .data;
 
     let mapped_cols: Vec<usize> = used_cols
         .take()
diff --git a/native-engine/datafusion-ext-plans/src/common/column_pruning.rs b/native-engine/datafusion-ext-plans/src/common/column_pruning.rs
index c8c1a3a..9159024 100644
--- a/native-engine/datafusion-ext-plans/src/common/column_pruning.rs
+++ b/native-engine/datafusion-ext-plans/src/common/column_pruning.rs
@@ -76,16 +76,18 @@
     let mapped_exprs: Vec<PhysicalExprRef> = exprs
         .iter()
         .map(|expr| {
-            expr.clone().transform_down(&|node: PhysicalExprRef| {
-                Ok(Transformed::Yes(
-                    if let Some(column) = node.as_any().downcast_ref::<Column>() {
-                        let mapped_idx = required_columns_mapping[&column.index()];
-                        Arc::new(Column::new(column.name(), mapped_idx))
-                    } else {
-                        node
-                    },
-                ))
-            })
+            expr.clone()
+                .transform_down(&|node: PhysicalExprRef| {
+                    Ok(Transformed::yes(
+                        if let Some(column) = node.as_any().downcast_ref::<Column>() {
+                            let mapped_idx = required_columns_mapping[&column.index()];
+                            Arc::new(Column::new(column.name(), mapped_idx))
+                        } else {
+                            node
+                        },
+                    ))
+                })
+                .map(|r| r.data)
         })
         .collect::<Result<_>>()?;
 
diff --git a/native-engine/datafusion-ext-plans/src/debug_exec.rs b/native-engine/datafusion-ext-plans/src/debug_exec.rs
index 76f7790..481204d 100644
--- a/native-engine/datafusion-ext-plans/src/debug_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/debug_exec.rs
@@ -25,20 +25,22 @@
 use datafusion::{
     error::Result,
     execution::context::TaskContext,
-    physical_expr::PhysicalSortExpr,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-        SendableRecordBatchStream, Statistics,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
     },
 };
 use futures::{Stream, StreamExt};
+use once_cell::sync::OnceCell;
 
 #[derive(Debug)]
 pub struct DebugExec {
     input: Arc<dyn ExecutionPlan>,
     debug_id: String,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl DebugExec {
@@ -47,6 +49,7 @@
             input,
             debug_id,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         }
     }
 }
@@ -59,6 +62,10 @@
 
 #[async_trait]
 impl ExecutionPlan for DebugExec {
+    fn name(&self) -> &str {
+        "DebugExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -67,16 +74,18 @@
         self.input.schema()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.input.output_ordering()
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/empty_partitions_exec.rs b/native-engine/datafusion-ext-plans/src/empty_partitions_exec.rs
index d3a6588..e335de1 100644
--- a/native-engine/datafusion-ext-plans/src/empty_partitions_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/empty_partitions_exec.rs
@@ -25,19 +25,21 @@
 use datafusion::{
     error::Result,
     execution::context::TaskContext,
-    physical_expr::PhysicalSortExpr,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
-        metrics::MetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
-        Partitioning::UnknownPartitioning, RecordBatchStream, SendableRecordBatchStream,
-        Statistics,
+        metrics::MetricsSet, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
+        Partitioning::UnknownPartitioning, PlanProperties, RecordBatchStream,
+        SendableRecordBatchStream, Statistics,
     },
 };
 use futures::Stream;
+use once_cell::sync::OnceCell;
 
 #[derive(Debug, Clone)]
 pub struct EmptyPartitionsExec {
     schema: SchemaRef,
     num_partitions: usize,
+    props: OnceCell<PlanProperties>,
 }
 
 impl EmptyPartitionsExec {
@@ -45,6 +47,7 @@
         Self {
             schema,
             num_partitions,
+            props: OnceCell::new(),
         }
     }
 }
@@ -61,6 +64,10 @@
 
 #[async_trait]
 impl ExecutionPlan for EmptyPartitionsExec {
+    fn name(&self) -> &str {
+        "EmptyPartitionsExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -69,15 +76,17 @@
         self.schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        UnknownPartitioning(self.num_partitions)
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                UnknownPartitioning(self.num_partitions),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
         vec![]
     }
 
diff --git a/native-engine/datafusion-ext-plans/src/expand_exec.rs b/native-engine/datafusion-ext-plans/src/expand_exec.rs
index b1120be..71e0791 100644
--- a/native-engine/datafusion-ext-plans/src/expand_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/expand_exec.rs
@@ -21,17 +21,17 @@
 use datafusion::{
     common::{Result, Statistics},
     execution::context::TaskContext,
-    physical_expr::{PhysicalExpr, PhysicalSortExpr},
+    physical_expr::{EquivalenceProperties, PhysicalExpr},
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
-        Partitioning::UnknownPartitioning,
-        SendableRecordBatchStream,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, SendableRecordBatchStream,
     },
 };
 use datafusion_ext_commons::{cast::cast, df_execution_err};
 use futures::{stream::once, StreamExt, TryStreamExt};
+use once_cell::sync::OnceCell;
 
 use crate::common::output::TaskOutputter;
 
@@ -41,6 +41,7 @@
     projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
     input: Arc<dyn ExecutionPlan>,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl ExpandExec {
@@ -68,6 +69,7 @@
             projections,
             input,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         })
     }
 }
@@ -79,6 +81,10 @@
 }
 
 impl ExecutionPlan for ExpandExec {
+    fn name(&self) -> &str {
+        "ExpandExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -87,16 +93,18 @@
         self.schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        UnknownPartitioning(0)
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
@@ -108,6 +116,7 @@
             projections: self.projections.clone(),
             input: children[0].clone(),
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         }))
     }
 
@@ -386,7 +395,7 @@
             "| 103.4       |",
             "| 0.6         |",
             "| 1.15        |",
-            "| 0.0         |",
+            "| -0.0        |",
             "| -1.7        |",
             "| -1.2        |",
             "| -0.29999995 |",
diff --git a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
index f79c865..10e7ea9 100644
--- a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
@@ -23,22 +23,24 @@
 use datafusion::{
     error::Result,
     execution::context::TaskContext,
-    physical_expr::PhysicalSortExpr,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
         Partitioning::UnknownPartitioning,
-        SendableRecordBatchStream, Statistics,
+        PlanProperties, SendableRecordBatchStream, Statistics,
     },
 };
 use datafusion_ext_commons::streams::ffi_stream::FFIReaderStream;
 use jni::objects::JObject;
+use once_cell::sync::OnceCell;
 
 pub struct FFIReaderExec {
     num_partitions: usize,
     schema: SchemaRef,
     export_iter_provider_resource_id: String,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl FFIReaderExec {
@@ -52,6 +54,7 @@
             export_iter_provider_resource_id,
             schema,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         }
     }
 }
@@ -69,6 +72,10 @@
 }
 
 impl ExecutionPlan for FFIReaderExec {
+    fn name(&self) -> &str {
+        "FFIReaderExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -77,15 +84,17 @@
         self.schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        UnknownPartitioning(self.num_partitions)
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                UnknownPartitioning(self.num_partitions),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
         vec![]
     }
 
diff --git a/native-engine/datafusion-ext-plans/src/filter_exec.rs b/native-engine/datafusion-ext-plans/src/filter_exec.rs
index fea0a92..ca10294 100644
--- a/native-engine/datafusion-ext-plans/src/filter_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/filter_exec.rs
@@ -18,16 +18,18 @@
 use datafusion::{
     common::{Result, Statistics},
     execution::context::TaskContext,
-    physical_expr::{expressions::Column, PhysicalExprRef, PhysicalSortExpr},
+    physical_expr::{expressions::Column, EquivalenceProperties, PhysicalExprRef},
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, SendableRecordBatchStream,
     },
 };
 use datafusion_ext_commons::{df_execution_err, streams::coalesce_stream::CoalesceInput};
 use futures::{stream::once, StreamExt, TryStreamExt};
 use itertools::Itertools;
+use once_cell::sync::OnceCell;
 
 use crate::{
     common::{
@@ -44,6 +46,7 @@
     input: Arc<dyn ExecutionPlan>,
     predicates: Vec<PhysicalExprRef>,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl FilterExec {
@@ -66,6 +69,7 @@
             input,
             predicates,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         })
     }
 
@@ -85,6 +89,10 @@
 }
 
 impl ExecutionPlan for FilterExec {
+    fn name(&self) -> &str {
+        "FilterExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -93,16 +101,18 @@
         self.input.schema()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.input.output_ordering()
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/generate_exec.rs b/native-engine/datafusion-ext-plans/src/generate_exec.rs
index 6c7c7fc..c8d1d99 100644
--- a/native-engine/datafusion-ext-plans/src/generate_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/generate_exec.rs
@@ -27,16 +27,18 @@
 use datafusion::{
     common::{Result, Statistics},
     execution::context::TaskContext,
-    physical_expr::{expressions::Column, PhysicalExpr, PhysicalSortExpr},
+    physical_expr::{expressions::Column, EquivalenceProperties, PhysicalExpr},
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, SendableRecordBatchStream,
     },
 };
 use datafusion_ext_commons::{batch_size, cast::cast, streams::coalesce_stream::CoalesceInput};
 use futures::{stream::once, StreamExt, TryFutureExt, TryStreamExt};
 use num::integer::Roots;
+use once_cell::sync::OnceCell;
 use parking_lot::Mutex;
 
 use crate::{
@@ -57,6 +59,7 @@
     input: Arc<dyn ExecutionPlan>,
     output_schema: SchemaRef,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl GenerateExec {
@@ -94,6 +97,7 @@
             input,
             output_schema,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         })
     }
 
@@ -117,6 +121,10 @@
 }
 
 impl ExecutionPlan for GenerateExec {
+    fn name(&self) -> &str {
+        "GenerateExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -125,16 +133,18 @@
         self.output_schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
index 11ed91d..9396e13 100644
--- a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
@@ -35,13 +35,13 @@
 use datafusion::{
     error::{DataFusionError, Result},
     execution::context::TaskContext,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
-        expressions::PhysicalSortExpr,
         metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
         Partitioning::UnknownPartitioning,
-        SendableRecordBatchStream, Statistics,
+        PlanProperties, SendableRecordBatchStream, Statistics,
     },
 };
 use datafusion_ext_commons::{
@@ -50,6 +50,7 @@
 };
 use futures::{stream::once, TryStreamExt};
 use jni::objects::{GlobalRef, JObject};
+use once_cell::sync::OnceCell;
 use parking_lot::Mutex;
 
 use crate::common::{ipc_compression::IpcCompressionReader, output::TaskOutputter};
@@ -60,6 +61,7 @@
     pub ipc_provider_resource_id: String,
     pub schema: SchemaRef,
     pub metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 impl IpcReaderExec {
     pub fn new(
@@ -72,6 +74,7 @@
             ipc_provider_resource_id,
             schema,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         }
     }
 }
@@ -84,6 +87,10 @@
 
 #[async_trait]
 impl ExecutionPlan for IpcReaderExec {
+    fn name(&self) -> &str {
+        "IpcReaderExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -92,15 +99,17 @@
         self.schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        UnknownPartitioning(self.num_partitions)
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                UnknownPartitioning(self.num_partitions),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
         vec![]
     }
 
diff --git a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
index 6aa5544..c22d532 100644
--- a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
@@ -22,17 +22,18 @@
 use datafusion::{
     error::Result,
     execution::context::TaskContext,
-    physical_expr::PhysicalSortExpr,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
-        Statistics,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, SendableRecordBatchStream, Statistics,
     },
 };
 use datafusion_ext_commons::streams::coalesce_stream::CoalesceInput;
 use futures::{stream::once, StreamExt, TryStreamExt};
 use jni::objects::{GlobalRef, JObject};
+use once_cell::sync::OnceCell;
 
 use crate::common::{
     ipc_compression::IpcCompressionWriter, output::TaskOutputter, timer_helper::TimerHelper,
@@ -43,6 +44,7 @@
     input: Arc<dyn ExecutionPlan>,
     ipc_consumer_resource_id: String,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl IpcWriterExec {
@@ -51,6 +53,7 @@
             input,
             ipc_consumer_resource_id,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         }
     }
 }
@@ -63,6 +66,10 @@
 
 #[async_trait]
 impl ExecutionPlan for IpcWriterExec {
+    fn name(&self) -> &str {
+        "IpcWriterExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -71,16 +78,18 @@
         self.input.schema()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.input.output_ordering()
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/lib.rs b/native-engine/datafusion-ext-plans/src/lib.rs
index 052eaf2..cd9bc49 100644
--- a/native-engine/datafusion-ext-plans/src/lib.rs
+++ b/native-engine/datafusion-ext-plans/src/lib.rs
@@ -12,6 +12,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#![allow(incomplete_features)]
+#![allow(internal_features)]
 #![feature(adt_const_params)]
 #![feature(core_intrinsics)]
 #![feature(get_mut_unchecked)]
@@ -50,6 +52,7 @@
 pub mod common;
 pub mod generate;
 pub mod joins;
+mod scan;
 mod shuffle;
 pub mod window;
 
diff --git a/native-engine/datafusion-ext-plans/src/limit_exec.rs b/native-engine/datafusion-ext-plans/src/limit_exec.rs
index 5bdd729..825c89c 100644
--- a/native-engine/datafusion-ext-plans/src/limit_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/limit_exec.rs
@@ -10,20 +10,22 @@
 use datafusion::{
     common::Result,
     execution::context::TaskContext,
-    physical_expr::PhysicalSortExpr,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-        SendableRecordBatchStream, Statistics,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
     },
 };
 use futures::{Stream, StreamExt};
+use once_cell::sync::OnceCell;
 
 #[derive(Debug)]
 pub struct LimitExec {
     input: Arc<dyn ExecutionPlan>,
     limit: u64,
     pub metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl LimitExec {
@@ -32,6 +34,7 @@
             input,
             limit,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         }
     }
 }
@@ -43,6 +46,10 @@
 }
 
 impl ExecutionPlan for LimitExec {
+    fn name(&self) -> &str {
+        "LimitExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -51,16 +58,18 @@
         self.input.schema()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.input.output_ordering()
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs
index dfca0d9..4ae1139 100644
--- a/native-engine/datafusion-ext-plans/src/orc_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs
@@ -21,27 +21,32 @@
 use blaze_jni_bridge::{jni_call_static, jni_new_global_ref, jni_new_string};
 use bytes::Bytes;
 use datafusion::{
-    datasource::physical_plan::{
-        FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileStream, SchemaAdapter,
+    datasource::{
+        physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileStream},
+        schema_adapter::SchemaAdapter,
     },
     error::Result,
     execution::context::TaskContext,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
-        expressions::PhysicalSortExpr,
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning, PhysicalExpr,
-        RecordBatchStream, SendableRecordBatchStream, Statistics,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Metric, Partitioning,
+        PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
     },
 };
-use datafusion_ext_commons::{batch_size, hadoop_fs::FsProvider};
+use datafusion_ext_commons::{batch_size, df_execution_err, hadoop_fs::FsProvider};
 use futures::{future::BoxFuture, FutureExt, StreamExt};
 use futures_util::{stream::once, TryStreamExt};
+use once_cell::sync::OnceCell;
 use orc_rust::{
     arrow_reader::ArrowReaderBuilder, projection::ProjectionMask, reader::AsyncChunkReader,
 };
 
-use crate::common::{internal_file_reader::InternalFileReader, output::TaskOutputter};
+use crate::{
+    common::{internal_file_reader::InternalFileReader, output::TaskOutputter},
+    scan::BlazeSchemaAdapter,
+};
 
 /// Execution plan for scanning one or more Orc partitions
 #[derive(Debug, Clone)]
@@ -50,9 +55,9 @@
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
-    projected_output_ordering: Vec<Vec<PhysicalSortExpr>>,
     metrics: ExecutionPlanMetricsSet,
     _predicate: Option<Arc<dyn PhysicalExpr>>,
+    props: OnceCell<PlanProperties>,
 }
 
 impl OrcExec {
@@ -65,7 +70,7 @@
     ) -> Self {
         let metrics = ExecutionPlanMetricsSet::new();
 
-        let (projected_schema, projected_statistics, projected_output_ordering) =
+        let (projected_schema, projected_statistics, _projected_output_ordering) =
             base_config.project();
 
         Self {
@@ -73,9 +78,9 @@
             base_config,
             projected_statistics,
             projected_schema,
-            projected_output_ordering,
             metrics,
             _predicate,
+            props: OnceCell::new(),
         }
     }
 }
@@ -101,6 +106,10 @@
 }
 
 impl ExecutionPlan for OrcExec {
+    fn name(&self) -> &str {
+        "OrcExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -109,17 +118,17 @@
         Arc::clone(&self.projected_schema)
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                Partitioning::UnknownPartitioning(self.base_config.file_groups.len()),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.projected_output_ordering
-            .first()
-            .map(|ordering| ordering.as_slice())
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
         vec![]
     }
 
@@ -218,18 +227,20 @@
         let batch_size = self.batch_size;
         let projection = self.projection.clone();
         let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?);
-        let schema_adapter = SchemaAdapter::new(projected_schema);
+        let schema_adapter = BlazeSchemaAdapter::new(projected_schema);
 
         Ok(Box::pin(async move {
             let mut builder = ArrowReaderBuilder::try_new_async(reader)
                 .await
-                .map_err(ArrowError::from)?;
+                .or_else(|err| df_execution_err!("create orc reader error: {err}"))?;
             if let Some(range) = file_meta.range.clone() {
                 let range = range.start as usize..range.end as usize;
                 builder = builder.with_file_byte_range(range);
             }
             let file_schema = builder.schema();
-            let (schema_mapping, adapted_projections) = schema_adapter.map_schema(&file_schema)?;
+            let (schema_mapping, adapted_projections) =
+                schema_adapter.map_schema(file_schema.as_ref())?;
+
             // Offset by 1 since index 0 is the root
             let projection = adapted_projections
                 .iter()
diff --git a/native-engine/datafusion-ext-plans/src/parquet_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_exec.rs
index 5291218..28e2591 100644
--- a/native-engine/datafusion-ext-plans/src/parquet_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/parquet_exec.rs
@@ -19,18 +19,14 @@
 
 use std::{any::Any, fmt, fmt::Formatter, ops::Range, pin::Pin, sync::Arc};
 
-use arrow::{
-    array::{Array, ArrayRef, AsArray, ListArray},
-    datatypes::{DataType, SchemaRef},
-};
+use arrow::datatypes::SchemaRef;
 use blaze_jni_bridge::{
     conf, conf::BooleanConf, jni_call_static, jni_new_global_ref, jni_new_string,
 };
 use bytes::Bytes;
 use datafusion::{
-    common::DataFusionError,
     datasource::physical_plan::{
-        parquet::{page_filter::PagePruningPredicate, ParquetOpener},
+        parquet::{page_filter::PagePruningAccessPlanFilter, ParquetOpener},
         FileMeta, FileScanConfig, FileStream, OnError, ParquetFileMetrics,
         ParquetFileReaderFactory,
     },
@@ -41,87 +37,28 @@
         errors::ParquetError,
         file::metadata::ParquetMetaData,
     },
+    physical_expr::EquivalenceProperties,
     physical_optimizer::pruning::PruningPredicate,
     physical_plan::{
-        expressions::PhysicalSortExpr,
         metrics::{
             BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet, Time,
         },
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning, PhysicalExpr,
-        RecordBatchStream, SendableRecordBatchStream, Statistics,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Metric, Partitioning,
+        PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
     },
 };
-use datafusion_ext_commons::{batch_size, df_execution_err, hadoop_fs::FsProvider};
+use datafusion_ext_commons::{batch_size, hadoop_fs::FsProvider};
 use fmt::Debug;
 use futures::{future::BoxFuture, stream::once, FutureExt, StreamExt, TryStreamExt};
 use object_store::ObjectMeta;
 use once_cell::sync::OnceCell;
 use parking_lot::Mutex;
 
-use crate::common::{internal_file_reader::InternalFileReader, output::TaskOutputter};
-
-#[no_mangle]
-fn schema_adapter_cast_column(
-    col: &ArrayRef,
-    data_type: &DataType,
-) -> Result<ArrayRef, DataFusionError> {
-    macro_rules! handle_decimal {
-        ($s:ident, $t:ident, $tnative:ty, $prec:expr, $scale:expr) => {{
-            use arrow::{array::*, datatypes::*};
-            type DecimalBuilder = paste::paste! {[<$t Builder>]};
-            type IntType = paste::paste! {[<$s Type>]};
-
-            let col = col.as_primitive::<IntType>();
-            let mut decimal_builder = DecimalBuilder::new();
-            for i in 0..col.len() {
-                if col.is_valid(i) {
-                    decimal_builder.append_value(col.value(i) as $tnative);
-                } else {
-                    decimal_builder.append_null();
-                }
-            }
-            Ok(Arc::new(
-                decimal_builder
-                    .finish()
-                    .with_precision_and_scale($prec, $scale)?,
-            ))
-        }};
-    }
-    match data_type {
-        DataType::Decimal128(prec, scale) => match col.data_type() {
-            DataType::Int8 => handle_decimal!(Int8, Decimal128, i128, *prec, *scale),
-            DataType::Int16 => handle_decimal!(Int16, Decimal128, i128, *prec, *scale),
-            DataType::Int32 => handle_decimal!(Int32, Decimal128, i128, *prec, *scale),
-            DataType::Int64 => handle_decimal!(Int64, Decimal128, i128, *prec, *scale),
-            DataType::Decimal128(p, s) if p == prec && s == scale => Ok(col.clone()),
-            _ => df_execution_err!(
-                "schema_adapter_cast_column unsupported type: {:?} => {:?}",
-                col.data_type(),
-                data_type,
-            ),
-        },
-        DataType::List(to_field) => match col.data_type() {
-            DataType::List(_from_field) => {
-                let col = col.as_list::<i32>();
-                let from_inner = col.values();
-                let to_inner = schema_adapter_cast_column(from_inner, to_field.data_type())?;
-                Ok(Arc::new(ListArray::try_new(
-                    to_field.clone(),
-                    col.offsets().clone(),
-                    to_inner,
-                    col.nulls().cloned(),
-                )?))
-            }
-            _ => df_execution_err!(
-                "schema_adapter_cast_column unsupported type: {:?} => {:?}",
-                col.data_type(),
-                data_type,
-            ),
-        },
-        _ => datafusion_ext_commons::cast::cast_scan_input_array(col.as_ref(), data_type),
-    }
-}
+use crate::{
+    common::{internal_file_reader::InternalFileReader, output::TaskOutputter},
+    scan::BlazeSchemaAdapterFactory,
+};
 
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
@@ -130,11 +67,11 @@
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
-    projected_output_ordering: Vec<Vec<PhysicalSortExpr>>,
     metrics: ExecutionPlanMetricsSet,
     predicate: Option<Arc<dyn PhysicalExpr>>,
     pruning_predicate: Option<Arc<PruningPredicate>>,
-    page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
+    page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
+    props: OnceCell<PlanProperties>,
 }
 
 impl ParquetExec {
@@ -162,20 +99,13 @@
                     }
                 }
             })
-            .filter(|p| !p.allways_true());
+            .filter(|p| !p.always_true());
 
-        let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| {
-            match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) {
-                Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
-                Err(e) => {
-                    log::warn!("Could not create page pruning predicate: {}", e);
-                    predicate_creation_errors.add(1);
-                    None
-                }
-            }
-        });
+        let page_pruning_predicate = predicate
+            .as_ref()
+            .map(|p| Arc::new(PagePruningAccessPlanFilter::new(p, file_schema.clone())));
 
-        let (projected_schema, projected_statistics, projected_output_ordering) =
+        let (projected_schema, projected_statistics, _projected_output_ordering) =
             base_config.project();
 
         Self {
@@ -183,11 +113,11 @@
             base_config,
             projected_schema,
             projected_statistics,
-            projected_output_ordering,
             metrics,
             predicate,
             pruning_predicate,
             page_pruning_predicate,
+            props: OnceCell::new(),
         }
     }
 }
@@ -217,6 +147,10 @@
 }
 
 impl ExecutionPlan for ParquetExec {
+    fn name(&self) -> &str {
+        "ParquetExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -225,25 +159,20 @@
         Arc::clone(&self.projected_schema)
     }
 
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
         vec![]
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                Partitioning::UnknownPartitioning(self.base_config.file_groups.len()),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.projected_output_ordering
-            .first()
-            .map(|ordering| ordering.as_slice())
-    }
-
-    // in datafusion 20.0.0 ExecutionPlan trait not include relies_on_input_order
-    // fn relies_on_input_order(&self) -> bool {
-    //     false
-    // }
-
     fn with_new_children(
         self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
@@ -275,6 +204,7 @@
         let fs = jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?;
         let fs_provider = Arc::new(FsProvider::new(jni_new_global_ref!(fs.as_obj())?, &io_time));
 
+        let schema_adapter_factory = Arc::new(BlazeSchemaAdapterFactory);
         let projection = match self.base_config.file_column_projection_indices() {
             Some(proj) => proj,
             None => (0..self.base_config.file_schema.fields().len()).collect(),
@@ -299,6 +229,7 @@
             reorder_filters: page_filtering_enabled,
             enable_page_index: page_filtering_enabled,
             enable_bloom_filter: bloom_filter_enabled,
+            schema_adapter_factory,
         };
 
         let mut file_stream =
@@ -434,7 +365,7 @@
 
         let inner = self.0.clone();
         let meta_size = inner.get_meta().size;
-        let size_hint = Some(1048576);
+        let size_hint = None;
         let cache_slot = (move || {
             let mut metadata_cache = METADATA_CACHE.get_or_init(|| Mutex::new(Vec::new())).lock();
 
diff --git a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs
index 00ef3f1..8b00a7d 100644
--- a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs
@@ -31,12 +31,12 @@
         file::properties::{EnabledStatistics, WriterProperties, WriterVersion},
         schema::{parser::parse_message_type, types::SchemaDescriptor},
     },
-    physical_expr::PhysicalSortExpr,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
         metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning,
-        SendableRecordBatchStream,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        Metric, PlanProperties, SendableRecordBatchStream,
     },
 };
 use datafusion_ext_commons::{
@@ -46,6 +46,7 @@
     hadoop_fs::{FsDataOutputStream, FsProvider},
 };
 use futures::{stream::once, StreamExt, TryStreamExt};
+use once_cell::sync::OnceCell;
 use parking_lot::Mutex;
 
 use crate::common::output::TaskOutputter;
@@ -57,6 +58,7 @@
     num_dyn_parts: usize,
     props: Vec<(String, String)>,
     metrics: ExecutionPlanMetricsSet,
+    plan_props: OnceCell<PlanProperties>,
 }
 
 impl ParquetSinkExec {
@@ -72,6 +74,7 @@
             num_dyn_parts,
             props,
             metrics: ExecutionPlanMetricsSet::new(),
+            plan_props: OnceCell::new(),
         }
     }
 }
@@ -83,6 +86,10 @@
 }
 
 impl ExecutionPlan for ParquetSinkExec {
+    fn name(&self) -> &str {
+        "ParquetSinkExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -91,16 +98,18 @@
         self.input.schema()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+    fn properties(&self) -> &PlanProperties {
+        self.plan_props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.input.output_ordering()
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/project_exec.rs b/native-engine/datafusion-ext-plans/src/project_exec.rs
index ef974d8..617df65 100644
--- a/native-engine/datafusion-ext-plans/src/project_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/project_exec.rs
@@ -21,16 +21,18 @@
 use datafusion::{
     common::{Result, Statistics},
     execution::TaskContext,
-    physical_expr::{PhysicalExprRef, PhysicalSortExpr},
+    physical_expr::{EquivalenceProperties, PhysicalExprRef},
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, SendableRecordBatchStream,
     },
 };
 use datafusion_ext_commons::streams::coalesce_stream::CoalesceInput;
 use futures::{stream::once, FutureExt, StreamExt, TryStreamExt};
 use itertools::Itertools;
+use once_cell::sync::OnceCell;
 
 use crate::{
     common::{
@@ -48,6 +50,7 @@
     input: Arc<dyn ExecutionPlan>,
     schema: SchemaRef,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl ProjectExec {
@@ -73,6 +76,7 @@
             input,
             schema,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         })
     }
 }
@@ -91,6 +95,10 @@
 }
 
 impl ExecutionPlan for ProjectExec {
+    fn name(&self) -> &str {
+        "ProjectExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -99,16 +107,18 @@
         self.schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(self.input.output_partitioning().partition_count())
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
@@ -181,6 +191,7 @@
             expr: projection.iter().map(|&i| self.expr[i].clone()).collect(),
             schema: Arc::new(self.schema.project(projection)?),
             metrics: self.metrics.clone(),
+            props: OnceCell::new(),
         });
         projected_project.execute(partition, context)
     }
diff --git a/native-engine/datafusion-ext-plans/src/rename_columns_exec.rs b/native-engine/datafusion-ext-plans/src/rename_columns_exec.rs
index 69b46cf..1642a40 100644
--- a/native-engine/datafusion-ext-plans/src/rename_columns_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/rename_columns_exec.rs
@@ -28,14 +28,15 @@
 use datafusion::{
     error::Result,
     execution::context::TaskContext,
-    physical_expr::PhysicalSortExpr,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-        SendableRecordBatchStream, Statistics,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
     },
 };
 use futures::{Stream, StreamExt};
+use once_cell::sync::OnceCell;
 
 use crate::agg::AGG_BUF_COLUMN_NAME;
 
@@ -45,6 +46,7 @@
     renamed_column_names: Vec<String>,
     renamed_schema: SchemaRef,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl RenameColumnsExec {
@@ -88,6 +90,7 @@
             renamed_column_names,
             renamed_schema,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         })
     }
 }
@@ -100,6 +103,10 @@
 
 #[async_trait]
 impl ExecutionPlan for RenameColumnsExec {
+    fn name(&self) -> &str {
+        "RenameColumnsExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -108,16 +115,18 @@
         self.renamed_schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.input.output_ordering()
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs b/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs
index bec2aa6..af981c5 100644
--- a/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs
@@ -22,15 +22,16 @@
     arrow::datatypes::SchemaRef,
     error::{DataFusionError, Result},
     execution::context::TaskContext,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
-        expressions::PhysicalSortExpr,
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
-        Statistics,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
+        SendableRecordBatchStream, Statistics,
     },
 };
 use futures::{stream::once, TryStreamExt};
+use once_cell::sync::OnceCell;
 
 use crate::{
     common::timer_helper::RegisterTimer,
@@ -46,14 +47,11 @@
 /// order of the resulting partitions.
 #[derive(Debug)]
 pub struct RssShuffleWriterExec {
-    /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
-    /// Partitioning scheme to use
     partitioning: Partitioning,
-    /// scala rssShuffleWriter
     pub rss_partition_writer_resource_id: String,
-    /// Metrics
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl DisplayAs for RssShuffleWriterExec {
@@ -68,26 +66,30 @@
 
 #[async_trait]
 impl ExecutionPlan for RssShuffleWriterExec {
-    /// Return a reference to Any that can be used for downcasting
+    fn name(&self) -> &str {
+        "RssShuffleWriterExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
 
-    /// Get the schema for this execution plan
     fn schema(&self) -> SchemaRef {
         self.input.schema()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.partitioning.clone()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.partitioning.clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
@@ -172,6 +174,7 @@
             partitioning,
             rss_partition_writer_resource_id,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         })
     }
 }
diff --git a/native-engine/datafusion-ext-plans/src/scan/mod.rs b/native-engine/datafusion-ext-plans/src/scan/mod.rs
new file mode 100644
index 0000000..f616d5d
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/scan/mod.rs
@@ -0,0 +1,187 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{fmt::Debug, sync::Arc};
+
+use arrow::{
+    array::{new_null_array, Array, ArrayRef, AsArray, ListArray, RecordBatch, RecordBatchOptions},
+    datatypes::{DataType, Schema, SchemaRef},
+};
+use datafusion::{
+    common::Result,
+    datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper},
+};
+use datafusion_ext_commons::df_execution_err;
+
+#[derive(Debug)]
+pub struct BlazeSchemaAdapterFactory;
+
+impl SchemaAdapterFactory for BlazeSchemaAdapterFactory {
+    fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+        Box::new(BlazeSchemaAdapter::new(schema))
+    }
+}
+
+pub struct BlazeSchemaAdapter {
+    table_schema: SchemaRef,
+}
+
+impl BlazeSchemaAdapter {
+    pub fn new(table_schema: SchemaRef) -> Self {
+        Self { table_schema }
+    }
+}
+
+impl SchemaAdapter for BlazeSchemaAdapter {
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
+        let field = self.table_schema.field(index);
+
+        // use case insensitive matching
+        file_schema
+            .fields
+            .iter()
+            .position(|f| f.name().eq_ignore_ascii_case(field.name()))
+    }
+
+    fn map_schema(&self, file_schema: &Schema) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+        let mut projection = Vec::with_capacity(file_schema.fields().len());
+        let mut field_mappings = vec![None; self.table_schema.fields().len()];
+
+        for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+            if let Some((table_idx, _table_field)) =
+                self.table_schema.fields().find(file_field.name())
+            {
+                field_mappings[table_idx] = Some(projection.len());
+                projection.push(file_idx);
+            }
+        }
+
+        Ok((
+            Arc::new(BlazeSchemaMapping {
+                table_schema: self.table_schema.clone(),
+                field_mappings,
+            }),
+            projection,
+        ))
+    }
+}
+
+#[derive(Debug)]
+pub struct BlazeSchemaMapping {
+    table_schema: SchemaRef,
+    field_mappings: Vec<Option<usize>>,
+}
+
+impl SchemaMapper for BlazeSchemaMapping {
+    fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
+        let batch_rows = batch.num_rows();
+        let batch_cols = batch.columns().to_vec();
+
+        let cols = self
+            .table_schema
+            .fields()
+            .iter()
+            .zip(&self.field_mappings)
+            .map(|(field, file_idx)| match file_idx {
+                Some(batch_idx) => {
+                    schema_adapter_cast_column(&batch_cols[*batch_idx], field.data_type())
+                }
+                None => Ok(new_null_array(field.data_type(), batch_rows)),
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+        let schema = self.table_schema.clone();
+        let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
+        Ok(record_batch)
+    }
+
+    fn map_partial_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
+        let batch_cols = batch.columns().to_vec();
+        let schema = batch.schema();
+
+        let mut cols = vec![];
+        let mut fields = vec![];
+        for (i, f) in schema.fields().iter().enumerate() {
+            let table_field = self.table_schema.field_with_name(f.name());
+            if let Ok(tf) = table_field {
+                cols.push(schema_adapter_cast_column(&batch_cols[i], tf.data_type())?);
+                fields.push(tf.clone());
+            }
+        }
+
+        let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+        let schema = Arc::new(Schema::new(fields));
+        let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
+        Ok(record_batch)
+    }
+}
+
+fn schema_adapter_cast_column(col: &ArrayRef, data_type: &DataType) -> Result<ArrayRef> {
+    macro_rules! handle_decimal {
+        ($s:ident, $t:ident, $tnative:ty, $prec:expr, $scale:expr) => {{
+            use arrow::{array::*, datatypes::*};
+            type DecimalBuilder = paste::paste! {[<$t Builder>]};
+            type IntType = paste::paste! {[<$s Type>]};
+
+            let col = col.as_primitive::<IntType>();
+            let mut decimal_builder = DecimalBuilder::new();
+            for i in 0..col.len() {
+                if col.is_valid(i) {
+                    decimal_builder.append_value(col.value(i) as $tnative);
+                } else {
+                    decimal_builder.append_null();
+                }
+            }
+            Ok(Arc::new(
+                decimal_builder
+                    .finish()
+                    .with_precision_and_scale($prec, $scale)?,
+            ))
+        }};
+    }
+    match data_type {
+        DataType::Decimal128(prec, scale) => match col.data_type() {
+            DataType::Int8 => handle_decimal!(Int8, Decimal128, i128, *prec, *scale),
+            DataType::Int16 => handle_decimal!(Int16, Decimal128, i128, *prec, *scale),
+            DataType::Int32 => handle_decimal!(Int32, Decimal128, i128, *prec, *scale),
+            DataType::Int64 => handle_decimal!(Int64, Decimal128, i128, *prec, *scale),
+            DataType::Decimal128(p, s) if p == prec && s == scale => Ok(col.clone()),
+            _ => df_execution_err!(
+                "schema_adapter_cast_column unsupported type: {:?} => {:?}",
+                col.data_type(),
+                data_type,
+            ),
+        },
+        DataType::List(to_field) => match col.data_type() {
+            DataType::List(_from_field) => {
+                let col = col.as_list::<i32>();
+                let from_inner = col.values();
+                let to_inner = schema_adapter_cast_column(from_inner, to_field.data_type())?;
+                Ok(Arc::new(ListArray::try_new(
+                    to_field.clone(),
+                    col.offsets().clone(),
+                    to_inner,
+                    col.nulls().cloned(),
+                )?))
+            }
+            _ => df_execution_err!(
+                "schema_adapter_cast_column unsupported type: {:?} => {:?}",
+                col.data_type(),
+                data_type,
+            ),
+        },
+        _ => datafusion_ext_commons::cast::cast_scan_input_array(col.as_ref(), data_type),
+    }
+}
diff --git a/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs b/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs
index d375d42..c389dcb 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs
@@ -21,16 +21,17 @@
 use datafusion::{
     error::Result,
     execution::context::TaskContext,
+    physical_expr::EquivalenceProperties,
     physical_plan::{
-        expressions::PhysicalSortExpr,
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
-        Statistics,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
+        SendableRecordBatchStream, Statistics,
     },
 };
 use datafusion_ext_commons::df_execution_err;
 use futures::{stream::once, TryStreamExt};
+use once_cell::sync::OnceCell;
 
 use crate::{
     common::{
@@ -49,16 +50,12 @@
 /// the resulting partitions.
 #[derive(Debug)]
 pub struct ShuffleWriterExec {
-    /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
-    /// Partitioning scheme to use
     partitioning: Partitioning,
-    /// Output data file path
     output_data_file: String,
-    /// Output index file path
     output_index_file: String,
-    /// Metrics
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl DisplayAs for ShuffleWriterExec {
@@ -69,26 +66,30 @@
 
 #[async_trait]
 impl ExecutionPlan for ShuffleWriterExec {
-    /// Return a reference to Any that can be used for downcasting
+    fn name(&self) -> &str {
+        "ShuffleWriterExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
 
-    /// Get the schema for this execution plan
     fn schema(&self) -> SchemaRef {
         self.input.schema()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.partitioning.clone()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.partitioning.clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
@@ -179,6 +180,7 @@
             metrics: ExecutionPlanMetricsSet::new(),
             output_data_file,
             output_index_file,
+            props: OnceCell::new(),
         })
     }
 }
diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs b/native-engine/datafusion-ext-plans/src/sort_exec.rs
index 336932b..8d7a3a9 100644
--- a/native-engine/datafusion-ext-plans/src/sort_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs
@@ -36,11 +36,12 @@
 use datafusion::{
     common::{Result, Statistics},
     execution::context::TaskContext,
-    physical_expr::{expressions::Column, PhysicalSortExpr},
+    physical_expr::{expressions::Column, EquivalenceProperties, PhysicalSortExpr},
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, SendableRecordBatchStream,
     },
 };
 use datafusion_ext_commons::{
@@ -81,6 +82,7 @@
     exprs: Vec<PhysicalSortExpr>,
     fetch: Option<usize>,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl SortExec {
@@ -95,6 +97,7 @@
             exprs,
             fetch,
             metrics,
+            props: OnceCell::new(),
         }
     }
 }
@@ -112,6 +115,10 @@
 }
 
 impl ExecutionPlan for SortExec {
+    fn name(&self) -> &str {
+        "SortExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -120,28 +127,29 @@
         self.input.schema()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        Some(&self.exprs)
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(Self {
-            input: children[0].clone(),
-            exprs: self.exprs.clone(),
-            fetch: self.fetch,
-            metrics: ExecutionPlanMetricsSet::new(),
-        }))
+        Ok(Arc::new(Self::new(
+            children[0].clone(),
+            self.exprs.clone(),
+            self.fetch,
+        )))
     }
 
     fn execute(
@@ -1352,12 +1360,15 @@
 mod fuzztest {
     use std::sync::Arc;
 
-    use arrow::{compute::SortOptions, record_batch::RecordBatch};
+    use arrow::{
+        array::{ArrayRef, UInt32Array},
+        compute::{concat_batches, SortOptions},
+        record_batch::RecordBatch,
+    };
     use datafusion::{
-        common::{Result, ScalarValue},
-        logical_expr::ColumnarValue,
-        physical_expr::{expressions::Column, math_expressions::random, PhysicalSortExpr},
-        physical_plan::{coalesce_batches::concat_batches, memory::MemoryExec},
+        common::Result,
+        physical_expr::{expressions::Column, PhysicalSortExpr},
+        physical_plan::memory::MemoryExec,
         prelude::{SessionConfig, SessionContext},
     };
 
@@ -1375,13 +1386,26 @@
         let mut batches = vec![];
         let mut num_rows = 0;
         while num_rows < n {
-            let nulls = ScalarValue::Null
-                .to_array_of_size((n - num_rows).min(10000))
-                .unwrap();
-            let rand_key1 = random(&[ColumnarValue::Array(nulls.clone())])?.into_array(0)?;
-            let rand_key2 = random(&[ColumnarValue::Array(nulls.clone())])?.into_array(0)?;
-            let rand_val1 = random(&[ColumnarValue::Array(nulls.clone())])?.into_array(0)?;
-            let rand_val2 = random(&[ColumnarValue::Array(nulls.clone())])?.into_array(0)?;
+            let rand_key1: ArrayRef = Arc::new(
+                std::iter::repeat_with(|| rand::random::<u32>())
+                    .take((n - num_rows).min(10000))
+                    .collect::<UInt32Array>(),
+            );
+            let rand_key2: ArrayRef = Arc::new(
+                std::iter::repeat_with(|| rand::random::<u32>())
+                    .take((n - num_rows).min(10000))
+                    .collect::<UInt32Array>(),
+            );
+            let rand_val1: ArrayRef = Arc::new(
+                std::iter::repeat_with(|| rand::random::<u32>())
+                    .take((n - num_rows).min(10000))
+                    .collect::<UInt32Array>(),
+            );
+            let rand_val2: ArrayRef = Arc::new(
+                std::iter::repeat_with(|| rand::random::<u32>())
+                    .take((n - num_rows).min(10000))
+                    .collect::<UInt32Array>(),
+            );
             let batch = RecordBatch::try_from_iter_with_nullable(vec![
                 ("k1", rand_key1, true),
                 ("k2", rand_key2, true),
@@ -1410,7 +1434,7 @@
         )?);
         let sort = Arc::new(SortExec::new(input, sort_exprs.clone(), None));
         let output = datafusion::physical_plan::collect(sort, task_ctx.clone()).await?;
-        let a = concat_batches(&schema, &output, n)?;
+        let a = concat_batches(&schema, &output)?;
 
         let input = Arc::new(MemoryExec::try_new(
             &[batches.clone()],
@@ -1422,7 +1446,7 @@
             input,
         ));
         let output = datafusion::physical_plan::collect(sort, task_ctx.clone()).await?;
-        let b = concat_batches(&schema, &output, n)?;
+        let b = concat_batches(&schema, &output)?;
 
         assert_eq!(a.num_rows(), b.num_rows());
         assert!(a == b);
diff --git a/native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs b/native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs
index 75b8f63..db4c6e2 100644
--- a/native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs
@@ -20,19 +20,20 @@
     common::{DataFusionError, JoinSide},
     error::Result,
     execution::context::TaskContext,
-    physical_expr::{PhysicalExprRef, PhysicalSortExpr},
+    physical_expr::{EquivalenceProperties, PhysicalExprRef},
     physical_plan::{
         joins::utils::JoinOn,
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, Time},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
-        Statistics,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PlanProperties, SendableRecordBatchStream, Statistics,
     },
 };
 use datafusion_ext_commons::{
     batch_size, df_execution_err, streams::coalesce_stream::CoalesceInput,
 };
 use futures::TryStreamExt;
+use once_cell::sync::OnceCell;
 
 use crate::{
     common::{
@@ -62,6 +63,7 @@
     sort_options: Vec<SortOptions>,
     schema: SchemaRef,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl SortMergeJoinExec {
@@ -81,6 +83,7 @@
             join_type,
             sort_options,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         })
     }
 
@@ -177,6 +180,10 @@
 }
 
 impl ExecutionPlan for SortMergeJoinExec {
+    fn name(&self) -> &str {
+        "SortMergeJoinExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -185,21 +192,18 @@
         self.schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.right.output_partitioning()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.right.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        match self.join_type {
-            Left | LeftSemi | LeftAnti | Existence => self.left.output_ordering(),
-            Right | RightSemi | RightAnti => self.right.output_ordering(),
-            Inner => self.left.output_ordering(),
-            Full => None,
-        }
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.left.clone(), self.right.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.left, &self.right]
     }
 
     fn with_new_children(
diff --git a/native-engine/datafusion-ext-plans/src/window_exec.rs b/native-engine/datafusion-ext-plans/src/window_exec.rs
index 648463c..db83617 100644
--- a/native-engine/datafusion-ext-plans/src/window_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/window_exec.rs
@@ -23,16 +23,17 @@
 use datafusion::{
     common::{Result, Statistics},
     execution::context::TaskContext,
-    physical_expr::PhysicalSortExpr,
+    physical_expr::{EquivalenceProperties, PhysicalSortExpr},
     physical_plan::{
         metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
-        SendableRecordBatchStream,
+        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
+        PhysicalExpr, PlanProperties, SendableRecordBatchStream,
     },
 };
 use datafusion_ext_commons::{cast::cast, streams::coalesce_stream::CoalesceInput};
 use futures::{stream::once, StreamExt, TryFutureExt, TryStreamExt};
+use once_cell::sync::OnceCell;
 
 use crate::{
     common::output::TaskOutputter,
@@ -44,6 +45,7 @@
     input: Arc<dyn ExecutionPlan>,
     context: Arc<WindowContext>,
     metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
 }
 
 impl WindowExec {
@@ -63,6 +65,7 @@
             input,
             context,
             metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
         })
     }
 }
@@ -74,6 +77,10 @@
 }
 
 impl ExecutionPlan for WindowExec {
+    fn name(&self) -> &str {
+        "WindowExec"
+    }
+
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -82,16 +89,18 @@
         self.context.output_schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                self.input.output_partitioning().clone(),
+                ExecutionMode::Bounded,
+            )
+        })
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.input.output_ordering()
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
     }
 
     fn with_new_children(
diff --git a/pom.xml b/pom.xml
index c212956..0f2f761 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
   <properties>
     <revision>3.0.1-SNAPSHOT</revision>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <arrowVersion>15.0.2</arrowVersion>
+    <arrowVersion>16.0.0</arrowVersion>
     <protobufVersion>3.21.9</protobufVersion>
     <hadoopClientApiVersion>3.4.0</hadoopClientApiVersion>
   </properties>