WIP: initialze GPU supporting with libcudf
diff --git a/Cargo.lock b/Cargo.lock
index 2d4f340..26dffd7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -521,6 +521,19 @@
 ]
 
 [[package]]
+name = "blaze-cudf-bridge"
+version = "0.1.0"
+dependencies = [
+ "arrow",
+ "datafusion",
+ "datafusion-ext-commons",
+ "log",
+ "num",
+ "parquet",
+ "paste",
+]
+
+[[package]]
 name = "blaze-jni-bridge"
 version = "0.1.0"
 dependencies = [
@@ -1099,6 +1112,7 @@
  "async-trait",
  "base64 0.22.1",
  "bitvec",
+ "blaze-cudf-bridge",
  "blaze-jni-bridge",
  "byteorder",
  "bytes 1.10.1",
diff --git a/Cargo.toml b/Cargo.toml
index e280a56..4972028 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,6 +22,7 @@
     "native-engine/blaze",
     "native-engine/blaze-jni-bridge",
     "native-engine/blaze-serde",
+    "native-engine/blaze-cudf-bridge",
 ]
 
 [profile.release]
@@ -45,6 +46,7 @@
 [workspace.dependencies]
 blaze = { path = "./native-engine/blaze" }
 blaze-jni-bridge = { path = "./native-engine/blaze-jni-bridge" }
+blaze-cudf-bridge = { path = "./native-engine/blaze-cudf-bridge" }
 blaze-serde = { path = "./native-engine/blaze-serde" }
 datafusion-ext-commons = { path = "./native-engine/datafusion-ext-commons" }
 datafusion-ext-exprs = { path = "./native-engine/datafusion-ext-exprs" }
diff --git a/native-engine/blaze-cudf-bridge/Cargo.lock b/native-engine/blaze-cudf-bridge/Cargo.lock
new file mode 100644
index 0000000..8c47323
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/Cargo.lock
@@ -0,0 +1,1248 @@
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+version = 4
+
+[[package]]
+name = "adler2"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
+
+[[package]]
+name = "ahash"
+version = "0.8.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
+dependencies = [
+ "cfg-if",
+ "const-random",
+ "getrandom 0.2.16",
+ "once_cell",
+ "version_check",
+ "zerocopy",
+]
+
+[[package]]
+name = "aho-corasick"
+version = "1.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
+name = "alloc-no-stdlib"
+version = "2.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3"
+
+[[package]]
+name = "alloc-stdlib"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece"
+dependencies = [
+ "alloc-no-stdlib",
+]
+
+[[package]]
+name = "android-tzdata"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
+
+[[package]]
+name = "android_system_properties"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
+dependencies = [
+ "libc",
+]
+
+[[package]]
+name = "arrow"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3095aaf545942ff5abd46654534f15b03a90fba78299d661e045e5d587222f0d"
+dependencies = [
+ "arrow-arith",
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-cast",
+ "arrow-csv",
+ "arrow-data",
+ "arrow-ipc",
+ "arrow-json",
+ "arrow-ord",
+ "arrow-row",
+ "arrow-schema",
+ "arrow-select",
+ "arrow-string",
+]
+
+[[package]]
+name = "arrow-arith"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "00752064ff47cee746e816ddb8450520c3a52cbad1e256f6fa861a35f86c45e7"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "chrono",
+ "num",
+]
+
+[[package]]
+name = "arrow-array"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cebfe926794fbc1f49ddd0cdaf898956ca9f6e79541efce62dabccfd81380472"
+dependencies = [
+ "ahash",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "chrono",
+ "half",
+ "hashbrown",
+ "num",
+]
+
+[[package]]
+name = "arrow-buffer"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0303c7ec4cf1a2c60310fc4d6bbc3350cd051a17bf9e9c0a8e47b4db79277824"
+dependencies = [
+ "bytes",
+ "half",
+ "num",
+]
+
+[[package]]
+name = "arrow-cast"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "335f769c5a218ea823d3760a743feba1ef7857cba114c01399a891c2fff34285"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "arrow-select",
+ "atoi",
+ "base64",
+ "chrono",
+ "comfy-table",
+ "half",
+ "lexical-core",
+ "num",
+ "ryu",
+]
+
+[[package]]
+name = "arrow-csv"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "510db7dfbb4d5761826516cc611d97b3a68835d0ece95b034a052601109c0b1b"
+dependencies = [
+ "arrow-array",
+ "arrow-cast",
+ "arrow-schema",
+ "chrono",
+ "csv",
+ "csv-core",
+ "lazy_static",
+ "regex",
+]
+
+[[package]]
+name = "arrow-data"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e8affacf3351a24039ea24adab06f316ded523b6f8c3dbe28fbac5f18743451b"
+dependencies = [
+ "arrow-buffer",
+ "arrow-schema",
+ "half",
+ "num",
+]
+
+[[package]]
+name = "arrow-ipc"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "69880a9e6934d9cba2b8630dd08a3463a91db8693b16b499d54026b6137af284"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "flatbuffers",
+]
+
+[[package]]
+name = "arrow-json"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d8dafd17a05449e31e0114d740530e0ada7379d7cb9c338fd65b09a8130960b0"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-cast",
+ "arrow-data",
+ "arrow-schema",
+ "chrono",
+ "half",
+ "indexmap",
+ "lexical-core",
+ "memchr",
+ "num",
+ "serde",
+ "serde_json",
+ "simdutf8",
+]
+
+[[package]]
+name = "arrow-ord"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "895644523af4e17502d42c3cb6b27cb820f0cb77954c22d75c23a85247c849e1"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "arrow-select",
+]
+
+[[package]]
+name = "arrow-row"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9be8a2a4e5e7d9c822b2b8095ecd77010576d824f654d347817640acfc97d229"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "half",
+]
+
+[[package]]
+name = "arrow-schema"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7450c76ab7c5a6805be3440dc2e2096010da58f7cab301fdc996a4ee3ee74e49"
+dependencies = [
+ "bitflags",
+]
+
+[[package]]
+name = "arrow-select"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aa5f5a93c75f46ef48e4001535e7b6c922eeb0aa20b73cf58d09e13d057490d8"
+dependencies = [
+ "ahash",
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "num",
+]
+
+[[package]]
+name = "arrow-string"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e7005d858d84b56428ba2a98a107fe88c0132c61793cf6b8232a1f9bfc0452b"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "arrow-select",
+ "memchr",
+ "num",
+ "regex",
+ "regex-syntax",
+]
+
+[[package]]
+name = "atoi"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528"
+dependencies = [
+ "num-traits",
+]
+
+[[package]]
+name = "autocfg"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
+
+[[package]]
+name = "base64"
+version = "0.22.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
+
+[[package]]
+name = "bitflags"
+version = "2.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
+
+[[package]]
+name = "blaze-cudf-bridge"
+version = "0.1.0"
+dependencies = [
+ "arrow",
+ "log",
+ "num",
+ "parquet",
+ "paste",
+]
+
+[[package]]
+name = "brotli"
+version = "7.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd"
+dependencies = [
+ "alloc-no-stdlib",
+ "alloc-stdlib",
+ "brotli-decompressor",
+]
+
+[[package]]
+name = "brotli-decompressor"
+version = "4.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a334ef7c9e23abf0ce748e8cd309037da93e606ad52eb372e4ce327a0dcfbdfd"
+dependencies = [
+ "alloc-no-stdlib",
+ "alloc-stdlib",
+]
+
+[[package]]
+name = "bumpalo"
+version = "3.17.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
+
+[[package]]
+name = "byteorder"
+version = "1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
+
+[[package]]
+name = "bytes"
+version = "1.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
+
+[[package]]
+name = "cc"
+version = "1.2.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8691782945451c1c383942c4874dbe63814f61cb57ef773cda2972682b7bb3c0"
+dependencies = [
+ "jobserver",
+ "libc",
+ "shlex",
+]
+
+[[package]]
+name = "cfg-if"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+
+[[package]]
+name = "chrono"
+version = "0.4.41"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d"
+dependencies = [
+ "android-tzdata",
+ "iana-time-zone",
+ "num-traits",
+ "windows-link",
+]
+
+[[package]]
+name = "comfy-table"
+version = "7.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4a65ebfec4fb190b6f90e944a817d60499ee0744e582530e2c9900a22e591d9a"
+dependencies = [
+ "unicode-segmentation",
+ "unicode-width",
+]
+
+[[package]]
+name = "const-random"
+version = "0.1.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359"
+dependencies = [
+ "const-random-macro",
+]
+
+[[package]]
+name = "const-random-macro"
+version = "0.1.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
+dependencies = [
+ "getrandom 0.2.16",
+ "once_cell",
+ "tiny-keccak",
+]
+
+[[package]]
+name = "core-foundation-sys"
+version = "0.8.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
+
+[[package]]
+name = "crc32fast"
+version = "1.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3"
+dependencies = [
+ "cfg-if",
+]
+
+[[package]]
+name = "crunchy"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929"
+
+[[package]]
+name = "csv"
+version = "1.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf"
+dependencies = [
+ "csv-core",
+ "itoa",
+ "ryu",
+ "serde",
+]
+
+[[package]]
+name = "csv-core"
+version = "0.1.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7d02f3b0da4c6504f86e9cd789d8dbafab48c2321be74e9987593de5a894d93d"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
+name = "equivalent"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
+
+[[package]]
+name = "flatbuffers"
+version = "25.2.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1"
+dependencies = [
+ "bitflags",
+ "rustc_version",
+]
+
+[[package]]
+name = "flate2"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece"
+dependencies = [
+ "crc32fast",
+ "libz-rs-sys",
+ "miniz_oxide",
+]
+
+[[package]]
+name = "getrandom"
+version = "0.2.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "wasi 0.11.0+wasi-snapshot-preview1",
+]
+
+[[package]]
+name = "getrandom"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "r-efi",
+ "wasi 0.14.2+wasi-0.2.4",
+]
+
+[[package]]
+name = "half"
+version = "2.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9"
+dependencies = [
+ "cfg-if",
+ "crunchy",
+ "num-traits",
+]
+
+[[package]]
+name = "hashbrown"
+version = "0.15.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3"
+
+[[package]]
+name = "iana-time-zone"
+version = "0.1.63"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8"
+dependencies = [
+ "android_system_properties",
+ "core-foundation-sys",
+ "iana-time-zone-haiku",
+ "js-sys",
+ "log",
+ "wasm-bindgen",
+ "windows-core",
+]
+
+[[package]]
+name = "iana-time-zone-haiku"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
+dependencies = [
+ "cc",
+]
+
+[[package]]
+name = "indexmap"
+version = "2.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e"
+dependencies = [
+ "equivalent",
+ "hashbrown",
+]
+
+[[package]]
+name = "integer-encoding"
+version = "3.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
+
+[[package]]
+name = "itoa"
+version = "1.0.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
+
+[[package]]
+name = "jobserver"
+version = "0.1.33"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a"
+dependencies = [
+ "getrandom 0.3.3",
+ "libc",
+]
+
+[[package]]
+name = "js-sys"
+version = "0.3.77"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f"
+dependencies = [
+ "once_cell",
+ "wasm-bindgen",
+]
+
+[[package]]
+name = "lazy_static"
+version = "1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
+
+[[package]]
+name = "lexical-core"
+version = "1.0.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b765c31809609075565a70b4b71402281283aeda7ecaf4818ac14a7b2ade8958"
+dependencies = [
+ "lexical-parse-float",
+ "lexical-parse-integer",
+ "lexical-util",
+ "lexical-write-float",
+ "lexical-write-integer",
+]
+
+[[package]]
+name = "lexical-parse-float"
+version = "1.0.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "de6f9cb01fb0b08060209a057c048fcbab8717b4c1ecd2eac66ebfe39a65b0f2"
+dependencies = [
+ "lexical-parse-integer",
+ "lexical-util",
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-parse-integer"
+version = "1.0.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72207aae22fc0a121ba7b6d479e42cbfea549af1479c3f3a4f12c70dd66df12e"
+dependencies = [
+ "lexical-util",
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-util"
+version = "1.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5a82e24bf537fd24c177ffbbdc6ebcc8d54732c35b50a3f28cc3f4e4c949a0b3"
+dependencies = [
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-write-float"
+version = "1.0.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c5afc668a27f460fb45a81a757b6bf2f43c2d7e30cb5a2dcd3abf294c78d62bd"
+dependencies = [
+ "lexical-util",
+ "lexical-write-integer",
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-write-integer"
+version = "1.0.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "629ddff1a914a836fb245616a7888b62903aae58fa771e1d83943035efa0f978"
+dependencies = [
+ "lexical-util",
+ "static_assertions",
+]
+
+[[package]]
+name = "libc"
+version = "0.2.172"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
+
+[[package]]
+name = "libm"
+version = "0.2.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
+
+[[package]]
+name = "libz-rs-sys"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6489ca9bd760fe9642d7644e827b0c9add07df89857b0416ee15c1cc1a3b8c5a"
+dependencies = [
+ "zlib-rs",
+]
+
+[[package]]
+name = "log"
+version = "0.4.27"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
+
+[[package]]
+name = "lz4_flex"
+version = "0.11.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
+dependencies = [
+ "twox-hash 1.6.3",
+]
+
+[[package]]
+name = "memchr"
+version = "2.7.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
+
+[[package]]
+name = "miniz_oxide"
+version = "0.8.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a"
+dependencies = [
+ "adler2",
+]
+
+[[package]]
+name = "num"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23"
+dependencies = [
+ "num-bigint",
+ "num-complex",
+ "num-integer",
+ "num-iter",
+ "num-rational",
+ "num-traits",
+]
+
+[[package]]
+name = "num-bigint"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
+dependencies = [
+ "num-integer",
+ "num-traits",
+]
+
+[[package]]
+name = "num-complex"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495"
+dependencies = [
+ "num-traits",
+]
+
+[[package]]
+name = "num-integer"
+version = "0.1.46"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f"
+dependencies = [
+ "num-traits",
+]
+
+[[package]]
+name = "num-iter"
+version = "0.1.45"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf"
+dependencies = [
+ "autocfg",
+ "num-integer",
+ "num-traits",
+]
+
+[[package]]
+name = "num-rational"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824"
+dependencies = [
+ "num-bigint",
+ "num-integer",
+ "num-traits",
+]
+
+[[package]]
+name = "num-traits"
+version = "0.2.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
+dependencies = [
+ "autocfg",
+ "libm",
+]
+
+[[package]]
+name = "once_cell"
+version = "1.21.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
+
+[[package]]
+name = "ordered-float"
+version = "2.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c"
+dependencies = [
+ "num-traits",
+]
+
+[[package]]
+name = "parquet"
+version = "55.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cd31a8290ac5b19f09ad77ee7a1e6a541f1be7674ad410547d5f1eef6eef4a9c"
+dependencies = [
+ "ahash",
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-cast",
+ "arrow-data",
+ "arrow-ipc",
+ "arrow-schema",
+ "arrow-select",
+ "base64",
+ "brotli",
+ "bytes",
+ "chrono",
+ "flate2",
+ "half",
+ "hashbrown",
+ "lz4_flex",
+ "num",
+ "num-bigint",
+ "paste",
+ "seq-macro",
+ "simdutf8",
+ "snap",
+ "thrift",
+ "twox-hash 2.1.0",
+ "zstd",
+]
+
+[[package]]
+name = "paste"
+version = "1.0.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
+
+[[package]]
+name = "pkg-config"
+version = "0.3.32"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
+
+[[package]]
+name = "proc-macro2"
+version = "1.0.95"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778"
+dependencies = [
+ "unicode-ident",
+]
+
+[[package]]
+name = "quote"
+version = "1.0.40"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d"
+dependencies = [
+ "proc-macro2",
+]
+
+[[package]]
+name = "r-efi"
+version = "5.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
+
+[[package]]
+name = "regex"
+version = "1.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-automata",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-automata"
+version = "0.4.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
+
+[[package]]
+name = "rustc_version"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
+dependencies = [
+ "semver",
+]
+
+[[package]]
+name = "rustversion"
+version = "1.0.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2"
+
+[[package]]
+name = "ryu"
+version = "1.0.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
+
+[[package]]
+name = "semver"
+version = "1.0.26"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0"
+
+[[package]]
+name = "seq-macro"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc"
+
+[[package]]
+name = "serde"
+version = "1.0.219"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6"
+dependencies = [
+ "serde_derive",
+]
+
+[[package]]
+name = "serde_derive"
+version = "1.0.219"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "serde_json"
+version = "1.0.140"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373"
+dependencies = [
+ "itoa",
+ "memchr",
+ "ryu",
+ "serde",
+]
+
+[[package]]
+name = "shlex"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
+
+[[package]]
+name = "simdutf8"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e"
+
+[[package]]
+name = "snap"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
+
+[[package]]
+name = "static_assertions"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
+
+[[package]]
+name = "syn"
+version = "2.0.101"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "unicode-ident",
+]
+
+[[package]]
+name = "thrift"
+version = "0.17.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09"
+dependencies = [
+ "byteorder",
+ "integer-encoding",
+ "ordered-float",
+]
+
+[[package]]
+name = "tiny-keccak"
+version = "2.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
+dependencies = [
+ "crunchy",
+]
+
+[[package]]
+name = "twox-hash"
+version = "1.6.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
+dependencies = [
+ "cfg-if",
+ "static_assertions",
+]
+
+[[package]]
+name = "twox-hash"
+version = "2.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e7b17f197b3050ba473acf9181f7b1d3b66d1cf7356c6cc57886662276e65908"
+
+[[package]]
+name = "unicode-ident"
+version = "1.0.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
+
+[[package]]
+name = "unicode-segmentation"
+version = "1.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493"
+
+[[package]]
+name = "unicode-width"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd"
+
+[[package]]
+name = "version_check"
+version = "0.9.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
+
+[[package]]
+name = "wasi"
+version = "0.11.0+wasi-snapshot-preview1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
+
+[[package]]
+name = "wasi"
+version = "0.14.2+wasi-0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3"
+dependencies = [
+ "wit-bindgen-rt",
+]
+
+[[package]]
+name = "wasm-bindgen"
+version = "0.2.100"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5"
+dependencies = [
+ "cfg-if",
+ "once_cell",
+ "rustversion",
+ "wasm-bindgen-macro",
+]
+
+[[package]]
+name = "wasm-bindgen-backend"
+version = "0.2.100"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6"
+dependencies = [
+ "bumpalo",
+ "log",
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-macro"
+version = "0.2.100"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407"
+dependencies = [
+ "quote",
+ "wasm-bindgen-macro-support",
+]
+
+[[package]]
+name = "wasm-bindgen-macro-support"
+version = "0.2.100"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-backend",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-shared"
+version = "0.2.100"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d"
+dependencies = [
+ "unicode-ident",
+]
+
+[[package]]
+name = "windows-core"
+version = "0.61.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980"
+dependencies = [
+ "windows-implement",
+ "windows-interface",
+ "windows-link",
+ "windows-result",
+ "windows-strings",
+]
+
+[[package]]
+name = "windows-implement"
+version = "0.60.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "windows-interface"
+version = "0.59.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "windows-link"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38"
+
+[[package]]
+name = "windows-result"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252"
+dependencies = [
+ "windows-link",
+]
+
+[[package]]
+name = "windows-strings"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97"
+dependencies = [
+ "windows-link",
+]
+
+[[package]]
+name = "wit-bindgen-rt"
+version = "0.39.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1"
+dependencies = [
+ "bitflags",
+]
+
+[[package]]
+name = "zerocopy"
+version = "0.7.35"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
+dependencies = [
+ "zerocopy-derive",
+]
+
+[[package]]
+name = "zerocopy-derive"
+version = "0.7.35"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "zlib-rs"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "868b928d7949e09af2f6086dfc1e01936064cc7a819253bce650d4e2a2d63ba8"
+
+[[package]]
+name = "zstd"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
+dependencies = [
+ "zstd-safe",
+]
+
+[[package]]
+name = "zstd-safe"
+version = "7.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d"
+dependencies = [
+ "zstd-sys",
+]
+
+[[package]]
+name = "zstd-sys"
+version = "2.0.15+zstd.1.5.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237"
+dependencies = [
+ "cc",
+ "pkg-config",
+]
diff --git a/native-engine/blaze-cudf-bridge/Cargo.toml b/native-engine/blaze-cudf-bridge/Cargo.toml
new file mode 100644
index 0000000..5f6d5bc
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "blaze-cudf-bridge"
+version = "0.1.0"
+edition = "2021"
+resolver = "1"
+
+[dependencies]
+arrow = { workspace = true }
+datafusion = { workspace = true }
+datafusion-ext-commons = { workspace = true }
+parquet = { workspace = true }
+log = "0.4.27"
+num = "0.4.3"
+paste = "1.0.15"
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/.clang-format b/native-engine/blaze-cudf-bridge/bridge-cpp/.clang-format
new file mode 100644
index 0000000..5dc170d
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/.clang-format
@@ -0,0 +1,20 @@
+BasedOnStyle: Chromium
+
+## custom style
+AccessModifierOffset: 0
+AlignAfterOpenBracket: AlwaysBreak
+AllowAllParametersOfDeclarationOnNextLine: true
+AllowShortBlocksOnASingleLine: false
+AllowShortFunctionsOnASingleLine: false
+BinPackArguments: true
+BinPackParameters: OnePerLine
+BreakConstructorInitializers: AfterColon
+BreakTemplateDeclarations: true
+ColumnLimit: 100
+ContinuationIndentWidth: 4
+IncludeBlocks: Merge
+IndentAccessModifiers: false
+IndentCaseBlocks: false
+IndentWidth: 4
+InsertBraces: true
+InsertNewlineAtEOF: true
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/CMakeLists.txt b/native-engine/blaze-cudf-bridge/bridge-cpp/CMakeLists.txt
new file mode 100644
index 0000000..f7cad96
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/CMakeLists.txt
@@ -0,0 +1,25 @@
+cmake_minimum_required(VERSION 3.15)
+project(blaze-cudf-bridge)
+
+set(CMAKE_CXX_STANDARD 17)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)
+set(VCPKG_BUILD_TYPE release)
+set(VCPKG_OVERLAY_PORTS "${CMAKE_SOURCE_DIR}/vcpkg_ports")
+
+find_package(Arrow CONFIG REQUIRED)
+find_package(CUDAToolkit REQUIRED)
+find_package(cudf REQUIRED)
+find_package(spdlog CONFIG REQUIRED)
+
+include_directories("${CMAKE_SOURCE_DIR}")
+file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS "src/*.cpp")
+list(FILTER SOURCES EXCLUDE REGEX ".*_test\\.cpp$")
+add_library(${PROJECT_NAME} SHARED ${SOURCES})
+
+target_link_libraries(${PROJECT_NAME}
+    PRIVATE
+    Arrow::arrow_static
+    CUDA::cudart
+    cudf::cudf
+    spdlog::spdlog
+)
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/CMakePresets.json b/native-engine/blaze-cudf-bridge/bridge-cpp/CMakePresets.json
new file mode 100644
index 0000000..c394626
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/CMakePresets.json
@@ -0,0 +1,15 @@
+{
+  "version": 2,
+  "configurePresets": [
+    {
+      "name": "vcpkg",
+      "generator": "Ninja",
+      "binaryDir": "${sourceDir}/build",
+      "cacheVariables": {
+        "CMAKE_TOOLCHAIN_FILE": "$env{VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake",
+        "VCPKG_INSTALLED_DIR": "${sourceDir}/vcpkg_installed",
+        "VCPKG_INSTALL_OPTIONS": "--allow-unsupported"
+      }
+    }
+  ]
+}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/build.sh b/native-engine/blaze-cudf-bridge/bridge-cpp/build.sh
new file mode 100755
index 0000000..5caa859
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/build.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+
+set -x
+set -e
+
+ROOT_DIR="$(cd `dirname "$0"` && pwd)"
+
+if [ ! -d "$VCPKG_ROOT" ]; then
+    echo "invalid VCPKG_ROOT: $VCPKG_ROOT" >&2
+    exit -1
+fi
+
+cmake --preset=vcpkg \
+  -DCMAKE_BUILD_TYPE=Release \
+  -DVCPKG_BUILD_TYPE=release \
+  -DVCPKG_OVERLAY_PORTS=./vcpkg_ports \
+  -DCMAKE_EXPORT_COMPILE_COMMANDS=1
+
+cmake --build build
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.cpp
new file mode 100644
index 0000000..7491537
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.cpp
@@ -0,0 +1,80 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "expr.hpp"
+#include <cudf/ast/expressions.hpp>
+#include <cudf/scalar/scalar.hpp>
+#include <memory>
+#include <stdexcept>
+
+size_t ExprBuilder::add_column_reference(size_t column_idx) {
+    auto expr_id = tree.size();
+    tree.emplace<cudf::ast::column_reference>(column_idx);
+    return expr_id;
+}
+
+size_t ExprBuilder::add_column_name_reference(std::string const& name) {
+    auto expr_id = tree.size();
+    tree.emplace<cudf::ast::column_name_reference>(name);
+    return expr_id;
+}
+
+size_t ExprBuilder::add_unary_operation(size_t arg1_expr_id, int op) {
+    auto const& arg1 = tree.at(arg1_expr_id);
+    auto expr_id = tree.size();
+    tree.emplace<cudf::ast::operation>((cudf::ast::ast_operator)op, arg1);
+    return expr_id;
+}
+
+size_t ExprBuilder::add_binary_operation(size_t arg1_expr_id, size_t arg2_expr_id, int op) {
+    auto const& arg1 = tree.at(arg1_expr_id);
+    auto const& arg2 = tree.at(arg2_expr_id);
+    auto expr_id = tree.size();
+    tree.emplace<cudf::ast::operation>((cudf::ast::ast_operator)op, arg1, arg2);
+    return expr_id;
+}
+
+size_t ExprBuilder::add_literal(std::shared_ptr<cudf::scalar> scalar) {
+    auto& scalar_value = *scalar.get();
+    auto&& type_id = typeid(scalar_value);
+    auto expr_id = tree.size();
+
+#define handle_numeric_type(c_ty)                                                                  \
+    if (type_id == typeid(cudf::numeric_scalar<c_ty>)) {                                           \
+        tree.emplace<cudf::ast::literal>(dynamic_cast<cudf::numeric_scalar<c_ty>&>(scalar_value)); \
+        scalar_pool.push_back((scalar));                                                           \
+        return expr_id;                                                                            \
+    }
+    handle_numeric_type(int8_t);
+    handle_numeric_type(int16_t);
+    handle_numeric_type(int32_t);
+    handle_numeric_type(int64_t);
+    handle_numeric_type(uint8_t);
+    handle_numeric_type(uint16_t);
+    handle_numeric_type(uint32_t);
+    handle_numeric_type(uint64_t);
+    handle_numeric_type(float);
+    handle_numeric_type(double);
+
+    if (type_id == typeid(cudf::string_scalar)) {
+        tree.emplace<cudf::ast::literal>(dynamic_cast<cudf::string_scalar&>(scalar_value));
+        scalar_pool.push_back(scalar);
+        return expr_id;
+    }
+    throw std::runtime_error(std::string("unsupported scalar type: ") + type_id.name());
+}
+
+cudf::ast::expression const& ExprBuilder::expr() const {
+    return tree.back();
+}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.hpp
new file mode 100644
index 0000000..b852847
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.hpp
@@ -0,0 +1,34 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <cudf/ast/expressions.hpp>
+#include <cudf/scalar/scalar.hpp>
+#include <memory>
+
+class ExprBuilder {
+    public:
+    size_t add_unary_operation(size_t arg1_expr_id, int op);
+    size_t add_binary_operation(size_t arg1_expr_id, size_t arg2_expr_id, int op);
+    size_t add_column_reference(size_t column_idx);
+    size_t add_column_name_reference(std::string const& name);
+    size_t add_literal(std::shared_ptr<cudf::scalar> scalar);
+
+    cudf::ast::expression const& expr() const;
+
+    private:
+    cudf::ast::tree tree;
+    std::vector<std::shared_ptr<cudf::scalar>> scalar_pool;
+};
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/ffi.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/ffi.cpp
new file mode 100644
index 0000000..7ceead7
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/ffi.cpp
@@ -0,0 +1,360 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <arrow/c/abi.h>
+#include <arrow/c/bridge.h>
+#include <arrow/result.h>
+#include <arrow/type.h>
+#include <spdlog/spdlog.h>
+#include <cstdint>
+#include <cudf/interop.hpp>
+#include <cudf/io/datasource.hpp>
+#include <cudf/scalar/scalar.hpp>
+#include <cudf/table/table.hpp>
+#include <cudf/table/table_view.hpp>
+#include <cudf/utilities/error.hpp>
+#include <memory>
+#include <stdexcept>
+#include <vector>
+#include "src/expr.hpp"
+#include "src/plan/filter.hpp"
+#include "src/plan/parquet_scan.hpp"
+#include "src/plan/plan.hpp"
+#include "src/plan/project.hpp"
+#include "src/plan/split_table.hpp"
+#include "src/plan/union.hpp"
+
+template <typename T>
+struct alignas(8) FFIObjWrapper {
+    FFIObjWrapper(T inner) : inner(std::move(inner)) {
+    }
+    T inner;
+};
+
+template <typename T>
+struct alignas(8) FFIResult {
+    FFIResult(bool ok, T value) : ok(ok), value(std::move(value)) {
+    }
+    bool ok;
+    T value;
+};
+
+template <>
+struct alignas(8) FFIResult<void> {
+    FFIResult(bool ok) : ok(ok) {
+    }
+    bool ok;
+};
+
+#define define_ffi_obj_wrapped_type(name, ty)       \
+    using name = FFIObjWrapper<ty>*;                \
+    extern "C" void ffi_##name##_delete(name ptr) { \
+        delete ptr;                                 \
+    }
+
+define_ffi_obj_wrapped_type(FFI_PlanPtr, std::shared_ptr<Plan>);
+define_ffi_obj_wrapped_type(FFI_TableStreamPtr, std::unique_ptr<TableStream>);
+define_ffi_obj_wrapped_type(FFI_TablePtr, std::unique_ptr<OwnedTable>);
+define_ffi_obj_wrapped_type(FFI_ScalarPtr, std::shared_ptr<cudf::scalar>);
+define_ffi_obj_wrapped_type(FFI_ExprBuilderPtr, std::shared_ptr<ExprBuilder>);
+
+#define catch_exceptions_and_return(ret)                                     \
+    catch (cudf::logic_error err) {                                          \
+        spdlog::error("cudf logic error({}): {}", __func__, err.what());     \
+        return (ret);                                                        \
+    }                                                                        \
+    catch (cudf::data_type_error err) {                                      \
+        spdlog::error("cudf data type error({}): {}", __func__, err.what()); \
+        std::cerr << err.stacktrace();                                       \
+        return (ret);                                                        \
+    }                                                                        \
+    catch (cudf::cuda_error err) {                                           \
+        spdlog::error("cudf cuda error({}): {}", __func__, err.what());      \
+        std::cerr << err.stacktrace();                                       \
+        return (ret);                                                        \
+    }                                                                        \
+    catch (std::runtime_error err) {                                         \
+        spdlog::error("runtime error({}): {}", __func__, err.what());        \
+        return (ret);                                                        \
+    }                                                                        \
+    catch (std::exception err) {                                             \
+        spdlog::error("exception({}): {}", __func__, err.what());            \
+        return (ret);                                                        \
+    }
+
+struct alignas(8) FFI_CudfDataSource {
+    void (*dtor)(FFI_CudfDataSource* self);
+    size_t (*data_size)(FFI_CudfDataSource const* self);
+    FFIResult<size_t> (*read)(
+        FFI_CudfDataSource* self, size_t offset, uint8_t* buf, size_t buf_size);
+    void* raw;
+
+    std::unique_ptr<cudf::io::datasource> to_cudf_io_datasource() {
+        struct Wrapper : public cudf::io::datasource {
+            public:
+            FFI_CudfDataSource inner;
+
+            Wrapper(FFI_CudfDataSource wrapped) : inner(wrapped) {
+            }
+
+            ~Wrapper() {
+                inner.dtor(&inner);
+            }
+
+            std::unique_ptr<datasource::buffer> host_read(size_t offset, size_t size) {
+                auto buffer = std::vector<uint8_t>(size);
+                auto total_read_size = 0;
+
+                while (total_read_size < size) {
+                    auto read_size = host_read(offset, size, &buffer[total_read_size]);
+                    if (read_size == 0) {
+                        throw std::runtime_error("got unexpected EOF while reading datasource");
+                    }
+                    total_read_size += read_size;
+                }
+                return owning_buffer<std::vector<uint8_t>>::create(std::move(buffer));
+            }
+
+            size_t host_read(size_t offset, size_t size, uint8_t* dst) {
+                auto read_size_ret = inner.read(&inner, offset, dst, size);
+                if (!read_size_ret.ok) {
+                    throw std::runtime_error("error reading datasource");
+                }
+                return read_size_ret.value;
+            }
+
+            size_t size() const {
+                return inner.data_size(&inner);
+            }
+        };
+        return std::make_unique<Wrapper>(*this);
+    }
+};
+
+extern "C" FFI_PlanPtr ffi_new_split_table_plan(
+    FFI_PlanPtr input_plan,
+    size_t batch_size) {
+    try {
+        auto plan = std::make_shared<SplitTablePlan>(input_plan->inner, batch_size);
+        return new FFIObjWrapper((std::shared_ptr<Plan>&&)std::move(plan));
+    }
+    catch_exceptions_and_return(nullptr);
+}
+
+extern "C" FFI_PlanPtr ffi_new_union_plan(
+    FFI_PlanPtr* input_plans_ptr,
+    size_t input_plans_len,
+    ArrowSchema* ffi_schema) {
+    try {
+        auto input_plans = std::vector<std::shared_ptr<Plan>>();
+        for (auto i = 0; i < input_plans_len; i++) {
+            input_plans.push_back(input_plans_ptr[i]->inner);
+        }
+
+        auto schema_ret = arrow::ImportSchema(ffi_schema);
+        if (!schema_ret.ok()) {
+            return nullptr;
+        }
+        auto schema = schema_ret.ValueUnsafe();
+
+        auto plan = std::make_shared<UnionPlan>(std::move(input_plans), schema);
+        return new FFIObjWrapper((std::shared_ptr<Plan>&&)std::move(plan));
+    }
+    catch_exceptions_and_return(nullptr);
+}
+
+extern "C" FFI_PlanPtr ffi_new_parquet_scan_plan(
+    FFI_CudfDataSource* ffi_datasource,
+    FFI_ExprBuilderPtr expr,
+    size_t read_offset,
+    size_t read_size,
+    ArrowSchema* ffi_read_schema) {
+    try {
+        auto datasource = ffi_datasource->to_cudf_io_datasource();
+        auto read_schema_ret = arrow::ImportSchema(ffi_read_schema);
+        if (!read_schema_ret.ok()) {
+            return nullptr;
+        }
+        auto read_schema = read_schema_ret.ValueUnsafe();
+
+        auto plan = std::make_shared<ParquetScanPlan>(
+            std::move(datasource), expr->inner, read_offset, read_size, read_schema);
+        return new FFIObjWrapper((std::shared_ptr<Plan>&&)std::move(plan));
+    }
+    catch_exceptions_and_return(nullptr);
+}
+
+extern "C" FFI_PlanPtr ffi_new_project_plan(
+    FFI_PlanPtr input_plan,
+    FFI_ExprBuilderPtr* exprs_ptr,
+    size_t exprs_len,
+    ArrowSchema* ffi_schema) {
+    try {
+        auto schema_ret = arrow::ImportSchema(ffi_schema);
+        if (!schema_ret.ok()) {
+            return nullptr;
+        }
+        auto schema = schema_ret.MoveValueUnsafe();
+
+        auto exprs = std::vector<std::shared_ptr<ExprBuilder>>();
+        for (size_t i = 0; i < exprs_len; i++) {
+            exprs.push_back(exprs_ptr[i]->inner);
+        }
+        auto plan = std::make_shared<ProjectPlan>(input_plan->inner, exprs, schema);
+        return new FFIObjWrapper((std::shared_ptr<Plan>&&)std::move(plan));
+    }
+    catch_exceptions_and_return(nullptr);
+}
+
+extern "C" FFI_PlanPtr ffi_new_filter_plan(FFI_PlanPtr input_plan, FFI_ExprBuilderPtr expr) {
+    try {
+        auto plan = std::make_shared<FilterPlan>(input_plan->inner, expr->inner);
+        return new FFIObjWrapper((std::shared_ptr<Plan>&&)std::move(plan));
+    }
+    catch_exceptions_and_return(nullptr);
+}
+
+extern "C" FFIResult<FFI_TableStreamPtr> ffi_execute_plan(FFI_PlanPtr plan) {
+    try {
+        spdlog::info("cudf executing plan:\n{}", desc_plan_tree(plan->inner));
+        auto table_stream_ret = plan->inner->execute();
+        if (!table_stream_ret.ok()) {
+            return FFIResult(false, (FFI_TableStreamPtr) nullptr);
+        }
+        return FFIResult(true, new FFIObjWrapper(std::move(table_stream_ret.ValueUnsafe())));
+    }
+    catch_exceptions_and_return(FFIResult(false, (FFI_TableStreamPtr) nullptr));
+}
+
+extern "C" FFIResult<bool> ffi_table_stream_has_next(FFI_TableStreamPtr table_stream) {
+    try {
+        auto has_next_ret = table_stream->inner->has_next();
+        if (!has_next_ret.ok()) {
+            return FFIResult(false, false);
+        }
+        return FFIResult(true, has_next_ret.ValueUnsafe());
+    }
+    catch_exceptions_and_return(FFIResult(false, false));
+}
+
+extern "C" FFIResult<FFI_TablePtr> ffi_table_stream_next(FFI_TableStreamPtr table_stream) {
+    try {
+        auto next_ret = table_stream->inner->next();
+        if (!next_ret.ok()) {
+            return FFIResult(false, (FFI_TablePtr) nullptr);
+        }
+        return FFIResult(
+            true, new FFIObjWrapper(std::make_unique<OwnedTable>(next_ret.MoveValueUnsafe())));
+    }
+    catch_exceptions_and_return(FFIResult(false, (FFI_TablePtr) nullptr));
+}
+
+extern "C" FFIResult<void> ffi_table_to_arrow(
+    FFI_TablePtr table, ArrowSchema* ffi_schema, ArrowArray* ffi_array) {
+    try {
+        auto schema_ret = arrow::ImportType(ffi_schema);
+        if (!schema_ret.ok()) {
+            return FFIResult<void>(false);
+        }
+        auto schema = schema_ret.ValueUnsafe();
+
+        auto device_table = cudf::to_arrow_host(table->inner->view());
+        auto arrow_table_ret = arrow::ImportDeviceArray(device_table.get(), schema);
+        if (!arrow_table_ret.ok()) {
+            return FFIResult<void>(false);
+        }
+        auto arrow_table = arrow_table_ret.ValueUnsafe();
+
+        if (!arrow::ExportArray(*arrow_table, ffi_array).ok()) {
+            return FFIResult<void>(false);
+        }
+        return FFIResult<void>(true);
+    }
+    catch_exceptions_and_return(FFIResult<void>(false));
+}
+
+#define ffi_def_numeric_scalar(rust_ty, c_ty)                                             \
+    extern "C" FFI_ScalarPtr ffi_##rust_ty##_scalar(c_ty value) {                         \
+        try {                                                                             \
+            auto scalar = std::make_shared<cudf::numeric_scalar<c_ty>>(value);            \
+            return new FFIObjWrapper((std::shared_ptr<cudf::scalar>&&)std::move(scalar)); \
+        }                                                                                 \
+        catch_exceptions_and_return(nullptr);                                             \
+    }
+
+ffi_def_numeric_scalar(i8, int8_t);
+ffi_def_numeric_scalar(i16, int16_t);
+ffi_def_numeric_scalar(i32, int32_t);
+ffi_def_numeric_scalar(i64, int64_t);
+ffi_def_numeric_scalar(u8, uint8_t);
+ffi_def_numeric_scalar(u16, uint16_t);
+ffi_def_numeric_scalar(u32, uint32_t);
+ffi_def_numeric_scalar(u64, uint64_t);
+ffi_def_numeric_scalar(f32, float);
+ffi_def_numeric_scalar(f64, double);
+
+extern "C" FFI_ScalarPtr ffi_string_scalar(char const* value) {
+    try {
+        auto scalar = std::make_shared<cudf::string_scalar>(std::string(value));
+        return new FFIObjWrapper((std::shared_ptr<cudf::scalar>&&)std::move(scalar));
+    }
+    catch_exceptions_and_return(nullptr);
+}
+
+extern "C" FFI_ExprBuilderPtr ffi_new_expr_builder() {
+    try {
+        return new FFIObjWrapper(std::make_shared<ExprBuilder>());
+    }
+    catch_exceptions_and_return(nullptr);
+}
+
+extern "C" size_t ffi_expr_builder_add_literal(
+    FFI_ExprBuilderPtr expr_builder, FFI_ScalarPtr scalar_value) {
+    try {
+        return expr_builder->inner->add_literal(scalar_value->inner);
+    }
+    catch_exceptions_and_return(-1);
+}
+
+extern "C" size_t ffi_expr_builder_add_column_reference(
+    FFI_ExprBuilderPtr expr_builder, size_t column_idx) {
+    try {
+        return expr_builder->inner->add_column_reference(column_idx);
+    }
+    catch_exceptions_and_return(-1);
+}
+
+extern "C" size_t ffi_expr_builder_add_column_name_reference(
+    FFI_ExprBuilderPtr expr_builder, char const* name) {
+    try {
+        return expr_builder->inner->add_column_name_reference(name);
+    }
+    catch_exceptions_and_return(-1);
+}
+
+extern "C" size_t ffi_expr_builder_add_unary_expr(
+    FFI_ExprBuilderPtr expr_builder, size_t arg1_expr_id, int32_t op) {
+    try {
+        return expr_builder->inner->add_unary_operation(arg1_expr_id, op);
+    }
+    catch_exceptions_and_return(-1);
+}
+
+extern "C" size_t ffi_expr_builder_add_binary_expr(
+    FFI_ExprBuilderPtr expr_builder, size_t arg1_expr_id, size_t arg2_expr_id, int32_t op) {
+    try {
+        return expr_builder->inner->add_binary_operation(arg1_expr_id, arg2_expr_id, op);
+    }
+    catch_exceptions_and_return(-1);
+}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.cpp
new file mode 100644
index 0000000..74c44b4
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.cpp
@@ -0,0 +1,61 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "filter.hpp"
+#include <arrow/result.h>
+#include <cudf/stream_compaction.hpp>
+#include <cudf/transform.hpp>
+#include <memory>
+#include <ostream>
+#include "src/plan/plan.hpp"
+#include "src/util.hpp"
+
+class FilterStream : public TableStream {
+    public:
+    FilterStream(std::unique_ptr<TableStream> input, std::shared_ptr<ExprBuilder> expr) :
+        input(std::move(input)), expr(expr) {
+    }
+
+    arrow::Result<bool> has_next() {
+        return input->has_next();
+    }
+
+    arrow::Result<OwnedTable> next() {
+        auto next_input = ARROW_TRY_RESULT(input->next());
+        auto filtered_mask = cudf::compute_column(next_input.view(), expr->expr());
+        auto filtered_table = cudf::apply_boolean_mask(next_input.view(), *filtered_mask);
+        return arrow::Result(std::move(filtered_table));
+    }
+
+    private:
+    std::unique_ptr<TableStream> input;
+    std::shared_ptr<ExprBuilder> expr;
+};
+
+FilterPlan::FilterPlan(std::shared_ptr<Plan> input, std::shared_ptr<ExprBuilder> expr) :
+    input(input), expr(expr) {
+}
+
+void FilterPlan::desc(std::ostream& oss) const {
+    oss << "CudfFilter []";
+}
+
+std::shared_ptr<arrow::Schema> FilterPlan::schema() const {
+    return input->schema();
+}
+
+arrow::Result<std::unique_ptr<TableStream>> FilterPlan::execute() const {
+    auto input_stream = ARROW_TRY_RESULT(input->execute());
+    return arrow::Result(std::make_unique<FilterStream>(std::move(input_stream), expr));
+}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.hpp
new file mode 100644
index 0000000..d4bfb80
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.hpp
@@ -0,0 +1,41 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <arrow/type.h>
+#include <cudf/ast/expressions.hpp>
+#include <memory>
+#include <ostream>
+#include <vector>
+#include "src/expr.hpp"
+#include "src/plan/plan.hpp"
+
+class FilterPlan : public Plan {
+    public:
+    FilterPlan(std::shared_ptr<Plan> input, std::shared_ptr<ExprBuilder> expr);
+
+    public:
+    void desc(std::ostream& oss) const;
+    std::shared_ptr<arrow::Schema> schema() const;
+    arrow::Result<std::unique_ptr<TableStream>> execute() const;
+
+    inline std::vector<std::shared_ptr<Plan>> children() const {
+        return std::vector{input};
+    }
+
+    private:
+    std::shared_ptr<Plan> input;
+    std::shared_ptr<ExprBuilder> expr;
+};
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.cpp
new file mode 100644
index 0000000..81a703f
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.cpp
@@ -0,0 +1,159 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "parquet_scan.hpp"
+#include <arrow/result.h>
+#include <arrow/type.h>
+#include <arrow/type_fwd.h>
+#include <spdlog/fmt/ranges.h>
+#include <spdlog/spdlog.h>
+#include <algorithm>
+#include <cstring>
+#include <cudf/ast/expressions.hpp>
+#include <cudf/column/column.hpp>
+#include <cudf/io/datasource.hpp>
+#include <cudf/io/parquet.hpp>
+#include <cudf/io/parquet_metadata.hpp>
+#include <cudf/io/types.hpp>
+#include <cudf/types.hpp>
+#include <memory>
+#include <optional>
+#include <regex>
+#include <stdexcept>
+#include <utility>
+#include <vector>
+#include "src/expr.hpp"
+#include "src/plan/plan.hpp"
+
+class ParquetScanTableStream : public TableStream {
+    public:
+    ParquetScanTableStream(std::shared_ptr<cudf::io::chunked_parquet_reader> reader) :
+        reader(reader) {
+        // according to cudf docs, first chunk must be existed
+        auto first_table = reader->read_chunk().tbl;
+        if (first_table->num_rows() > 0) {
+            next_table = std::make_optional(std::move(first_table));
+        }
+    }
+
+    arrow::Result<bool> has_next() {
+        if (!next_table.has_value() && reader->has_next()) {
+            next_table = std::make_optional(std::move(reader->read_chunk().tbl));
+        }
+        return arrow::Result(next_table.has_value());
+    }
+
+    arrow::Result<OwnedTable> next() {
+        auto p = std::move(next_table).value();
+        next_table = std::nullopt;
+        return arrow::Result(std::move(p));
+    }
+
+    private:
+    std::shared_ptr<cudf::io::chunked_parquet_reader> reader;
+    std::optional<std::unique_ptr<cudf::table>> next_table;
+};
+
+ParquetScanPlan::ParquetScanPlan(
+    std::unique_ptr<cudf::io::datasource> datasource,
+    std::shared_ptr<ExprBuilder> predicate_filter_expr,
+    size_t read_offset,
+    size_t read_size,
+    std::shared_ptr<arrow::Schema> schema) :
+    datasource(std::move(datasource)),
+    predicate_filter_expr(predicate_filter_expr),
+    read_offset(read_offset),
+    read_size(read_size),
+    output_schema(schema) {
+}
+
+void ParquetScanPlan::desc(std::ostream& oss) const {
+    oss << "CudfParquetScan [";
+    oss << std::regex_replace(output_schema->ToString(), std::regex("\n"), ", ");
+    oss << "]";
+}
+
+std::shared_ptr<arrow::Schema> ParquetScanPlan::schema() const {
+    return output_schema;
+}
+
+inline static std::vector<cudf::io::reader_column_schema> build_reader_column_schema(
+    arrow::Schema const& output_schema, cudf::io::parquet_metadata const& metadata) {
+    auto column_schema = std::vector<cudf::io::reader_column_schema>();
+    auto& fields = output_schema.fields();
+
+    for (auto const& col : metadata.schema().root().children()) {
+        auto field = std::find_if(fields.begin(), fields.end(), [&](auto f) {
+            return strcasecmp(f->name().c_str(), col.name().c_str()) == 0;
+        });
+        if (field == fields.end()) {
+            column_schema.emplace_back();
+        } else if (field->get()->type()->id() == arrow::Type::STRING) {
+            column_schema.emplace_back().set_convert_binary_to_strings(true);
+        } else if (field->get()->type()->id() == arrow::Type::STRUCT) {
+            throw std::runtime_error("complex type not implemented");
+        } else {
+            column_schema.emplace_back();
+        }
+    }
+    return column_schema;
+}
+
+arrow::Result<std::unique_ptr<TableStream>> ParquetScanPlan::execute() const {
+    auto datasource_total_size = datasource->size();
+    auto source_info = cudf::io::source_info(&*datasource);
+    auto metadata = cudf::io::read_parquet_metadata(source_info);
+
+    // calculate row group ids to read
+    auto read_row_group_ids = std::vector<cudf::size_type>();
+    auto num_row_groups = metadata.num_rowgroups();
+    auto file_total_byte_size = (uint64_t)0;
+    for (auto i = 0; i < num_row_groups; i++) {
+        auto rg_total_byte_size = metadata.rowgroup_metadata()[i].at("total_byte_size");
+        file_total_byte_size += rg_total_byte_size;
+    }
+    auto start = (uint64_t)0;
+    for (auto i = 0; i < num_row_groups; i++) {
+        auto rg_total_byte_size = metadata.rowgroup_metadata()[i].at("total_byte_size");
+        start += rg_total_byte_size;
+        auto mapped_end = start * datasource_total_size / file_total_byte_size;
+        if (mapped_end > read_offset && mapped_end <= read_offset + read_size) {
+            read_row_group_ids.push_back(i);
+        }
+    }
+    if (read_row_group_ids.empty()) {  // no row groups to read
+        return TableStream::empty_stream();
+    }
+    spdlog::info("reading parquet row groups: {}", read_row_group_ids);
+    spdlog::info("reading parquet fields: {}", output_schema->field_names());
+
+    auto reader_column_schema = build_reader_column_schema(*this->output_schema, metadata);
+    auto options_builder =
+        cudf::io::parquet_reader_options_builder(source_info)
+            .row_groups(std::vector<std::vector<cudf::size_type>>{std::move(read_row_group_ids)})
+            .set_column_schema(reader_column_schema)
+            .columns(output_schema->field_names())
+            .allow_mismatched_pq_schemas(true);
+
+    // set optional predicate filter
+    auto const& pred = predicate_filter_expr->expr();
+    if (typeid(pred) == typeid(cudf::ast::operation)) {
+        options_builder.filter(pred);
+    }
+    auto options = options_builder.build();
+
+    auto chunk_bytes_size = 16777216;
+    auto reader = std::make_shared<cudf::io::chunked_parquet_reader>(chunk_bytes_size, options);
+    return std::make_unique<ParquetScanTableStream>(reader);
+}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.hpp
new file mode 100644
index 0000000..b14b378
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.hpp
@@ -0,0 +1,47 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <arrow/type.h>
+#include <cudf/ast/expressions.hpp>
+#include <cudf/io/datasource.hpp>
+#include <functional>
+#include <memory>
+#include <optional>
+#include <ostream>
+#include "src/expr.hpp"
+#include "src/plan/plan.hpp"
+
+class ParquetScanPlan : public Plan {
+    public:
+    ParquetScanPlan(
+        std::unique_ptr<cudf::io::datasource> datasource,
+        std::shared_ptr<ExprBuilder> predicate_filter_expr,
+        size_t read_offset,
+        size_t read_size,
+        std::shared_ptr<arrow::Schema> schema);
+
+    void desc(std::ostream& oss) const;
+    std::shared_ptr<arrow::Schema> schema() const;
+    arrow::Result<std::unique_ptr<TableStream>> execute() const;
+
+    private:
+    std::unique_ptr<cudf::io::datasource> datasource;
+    std::shared_ptr<ExprBuilder> predicate_filter_expr;
+    size_t read_offset;
+    size_t read_size;
+    std::shared_ptr<arrow::Schema> output_schema;
+    std::optional<std::reference_wrapper<cudf::ast::expression const>> bound_predicate_filter;
+};
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/plan.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/plan.hpp
new file mode 100644
index 0000000..852bd9d
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/plan.hpp
@@ -0,0 +1,126 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <arrow/result.h>
+#include <arrow/type.h>
+#include <cudf/table/table.hpp>
+#include <cudf/table/table_view.hpp>
+#include <memory>
+#include <optional>
+#include <ostream>
+#include <sstream>
+#include <stack>
+#include <stdexcept>
+#include <vector>
+
+class OwnedTable {
+    public:
+    inline OwnedTable(std::unique_ptr<cudf::table> owned) :
+        owned(std::make_optional(std::move(owned))) {
+    }
+    inline OwnedTable(std::shared_ptr<OwnedTable> holder, cudf::table_view referenced) :
+        referenced(std::make_pair(holder, referenced)) {
+    }
+    inline OwnedTable(OwnedTable&& moving) :
+        owned(std::move(moving.owned)), referenced(std::move(moving.referenced)) {
+    }
+
+    inline cudf::table_view view() const {
+        if (owned.has_value()) {
+            return owned.value()->view();
+        }
+        return referenced.value().second;
+    }
+
+    private:
+    std::optional<std::unique_ptr<cudf::table>> owned;
+    std::optional<std::pair<std::shared_ptr<OwnedTable>, cudf::table_view>> referenced;
+};
+
+class TableStream {
+    public:
+    TableStream() {
+    }
+
+    virtual ~TableStream() {
+    }
+
+    public:
+    virtual arrow::Result<bool> has_next() = 0;
+    virtual arrow::Result<OwnedTable> next() = 0;
+
+    inline static std::unique_ptr<TableStream> empty_stream() {
+        class EmptyTableStream : public TableStream {
+            public:
+            inline arrow::Result<bool> has_next() {
+                return arrow::Result(false);
+            }
+
+            inline arrow::Result<OwnedTable> next() {
+                throw std::runtime_error("unreachable: EmptyTableStream has no tables");
+            }
+        };
+        return std::make_unique<EmptyTableStream>();
+    }
+};
+
+class Plan {
+    public:
+    Plan() {
+    }
+    virtual ~Plan() {
+    }
+
+    public:
+    virtual void desc(std::ostream& oss) const = 0;
+    virtual std::shared_ptr<arrow::Schema> schema() const = 0;
+    virtual arrow::Result<std::unique_ptr<TableStream>> execute() const = 0;
+
+    virtual std::vector<std::shared_ptr<Plan>> children() const {
+        return std::vector<std::shared_ptr<Plan>>{};
+    }
+};
+
+inline static std::string desc_plan_tree(std::shared_ptr<Plan> plan) {
+    struct plan_tree_node {
+        std::shared_ptr<Plan> plan;
+        int level;
+        plan_tree_node(std::shared_ptr<Plan> plan, int level) : plan(plan), level(level) {
+        }
+    };
+    auto dfs_stack = std::stack<plan_tree_node>();
+    auto nodes = std::vector<plan_tree_node>{};
+
+    dfs_stack.push(plan_tree_node(plan, 0));
+    while (!dfs_stack.empty()) {
+        auto node = dfs_stack.top();
+        nodes.push_back(node);
+        dfs_stack.pop();
+        for (auto child : node.plan->children()) {
+            dfs_stack.push(plan_tree_node(child, node.level + 1));
+        }
+    }
+
+    std::ostringstream oss;
+    for (auto node : nodes) {
+        for (auto indent = 0; indent < 2 * node.level; indent++) {
+            oss << " ";
+        }
+        node.plan->desc(oss);
+        oss << std::endl;
+    }
+    return oss.str();
+}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.cpp
new file mode 100644
index 0000000..a6c3b97
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.cpp
@@ -0,0 +1,82 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "project.hpp"
+#include <arrow/result.h>
+#include <arrow/status.h>
+#include <arrow/type.h>
+#include <cudf/column/column.hpp>
+#include <cudf/table/table.hpp>
+#include <cudf/transform.hpp>
+#include <memory>
+#include <ostream>
+#include <regex>
+#include <vector>
+#include "src/expr.hpp"
+#include "src/plan/plan.hpp"
+#include "src/util.hpp"
+
+class ProjectStream : public TableStream {
+    public:
+    ProjectStream(
+        std::unique_ptr<TableStream> input,
+        const std::vector<std::shared_ptr<ExprBuilder>>& exprs) :
+        input(std::move(input)), exprs(exprs) {
+    }
+
+    public:
+    arrow::Result<bool> has_next() {
+        return input->has_next();
+    }
+
+    arrow::Result<OwnedTable> next() {
+        auto next_input = ARROW_TRY_RESULT(input->next());
+        auto computed_columns = std::vector<std::unique_ptr<cudf::column>>();
+        computed_columns.reserve(exprs.size());
+
+        for (const auto& expr : exprs) {
+            auto column = cudf::compute_column(next_input.view(), expr->expr());
+            computed_columns.push_back(std::move(column));
+        }
+        auto computed_table =
+            std::unique_ptr<cudf::table>(new cudf::table(std::move(computed_columns)));
+        return arrow::Result(std::move(computed_table));
+    }
+
+    private:
+    std::unique_ptr<TableStream> input;
+    std::vector<std::shared_ptr<ExprBuilder>> exprs;
+};
+
+ProjectPlan::ProjectPlan(
+    std::shared_ptr<Plan> input,
+    std::vector<std::shared_ptr<ExprBuilder>> exprs,
+    std::shared_ptr<arrow::Schema> schema) :
+    input(input), exprs(exprs), output_schema(schema) {
+}
+
+void ProjectPlan::desc(std::ostream& oss) const {
+    oss << "CudfProject [";
+    oss << std::regex_replace(output_schema->ToString(), std::regex("\n"), ", ");
+    oss << "]";
+}
+
+std::shared_ptr<arrow::Schema> ProjectPlan::schema() const {
+    return output_schema;
+}
+
+arrow::Result<std::unique_ptr<TableStream>> ProjectPlan::execute() const {
+    auto input_stream = ARROW_TRY_RESULT(input->execute());
+    return arrow::Result(std::make_unique<ProjectStream>(std::move(input_stream), exprs));
+}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.hpp
new file mode 100644
index 0000000..5a2c0e0
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.hpp
@@ -0,0 +1,44 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <arrow/type.h>
+#include <cudf/ast/expressions.hpp>
+#include <memory>
+#include <vector>
+#include "src/expr.hpp"
+#include "src/plan/plan.hpp"
+
+class ProjectPlan : public Plan {
+    public:
+    ProjectPlan(
+        std::shared_ptr<Plan> input,
+        std::vector<std::shared_ptr<ExprBuilder>> exprs,
+        std::shared_ptr<arrow::Schema> schema);
+
+    public:
+    std::shared_ptr<arrow::Schema> schema() const;
+    arrow::Result<std::unique_ptr<TableStream>> execute() const;
+    void desc(std::ostream& oss) const;
+
+    inline std::vector<std::shared_ptr<Plan>> children() const {
+        return std::vector{input};
+    }
+
+    private:
+    std::shared_ptr<Plan> input;
+    std::vector<std::shared_ptr<ExprBuilder>> exprs;
+    std::shared_ptr<arrow::Schema> output_schema;
+};
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.cpp
new file mode 100644
index 0000000..e6dffb4
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.cpp
@@ -0,0 +1,96 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "split_table.hpp"
+#include <algorithm>
+#include <cudf/copying.hpp>
+#include <cudf/table/table.hpp>
+#include <cudf/table/table_view.hpp>
+#include <cudf/types.hpp>
+#include <deque>
+#include <memory>
+#include "src/plan/plan.hpp"
+#include "src/plan/split_table.hpp"
+#include "src/util.hpp"
+
+class SplitTableStream : public TableStream {
+    public:
+    SplitTableStream(std::unique_ptr<TableStream> input_stream, size_t batch_size) :
+        input_stream(std::move(input_stream)), batch_size(batch_size) {
+    }
+
+    arrow::Result<bool> has_next() {
+        if (buffered_tables.empty()) {
+            auto input_has_next = ARROW_TRY_RESULT(input_stream->has_next());
+            if (input_has_next) {
+                auto input_next = ARROW_TRY_RESULT(input_stream->next());
+                auto input_num_rows = size_t(input_next.view().num_rows());
+
+                if (input_num_rows == 0) {
+                    return has_next();  // process next input table
+                }
+                auto target_num_splitted_tables = (input_num_rows + batch_size - 1) / batch_size;
+                auto target_num_rows = (input_num_rows / target_num_splitted_tables) + 1;
+
+                if (target_num_splitted_tables == 1) {
+                    buffered_tables.push_back(std::move(input_next));
+                } else {
+                    auto split_indices = std::vector<cudf::size_type>();
+                    auto start = size_t(0);
+                    while (start < input_num_rows) {
+                        auto split_start = start;
+                        auto split_end = std::min(start + target_num_rows, input_num_rows);
+                        split_indices.push_back(cudf::size_type(split_start));
+                        split_indices.push_back(cudf::size_type(split_end));
+                        start = split_end;
+                    }
+                    auto holder = std::make_shared<OwnedTable>(std::move(input_next));
+                    auto sliced = cudf::slice(holder->view(), split_indices);
+                    for (auto splitted: sliced) {
+                        buffered_tables.emplace_back(holder, splitted);
+                    }
+                }
+            }
+        }
+        return !buffered_tables.empty();
+    }
+
+    arrow::Result<OwnedTable> next() {
+        auto front = std::move(buffered_tables.front());
+        buffered_tables.pop_front();
+        return front;
+    }
+
+    private:
+    std::unique_ptr<TableStream> input_stream;
+    size_t batch_size;
+    std::deque<OwnedTable> buffered_tables;
+};
+
+SplitTablePlan::SplitTablePlan(std::shared_ptr<Plan> input, size_t batch_size) :
+    input(input), batch_size(batch_size) {
+}
+
+std::shared_ptr<arrow::Schema> SplitTablePlan::schema() const {
+    return input->schema();
+}
+
+arrow::Result<std::unique_ptr<TableStream>> SplitTablePlan::execute() const {
+    auto input_stream = ARROW_TRY_RESULT(input->execute());
+    return arrow::Result(std::make_unique<SplitTableStream>(std::move(input_stream), batch_size));
+}
+
+void SplitTablePlan::desc(std::ostream& oss) const {
+    oss << "SplitTable []";
+}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.hpp
new file mode 100644
index 0000000..556dcab
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.hpp
@@ -0,0 +1,40 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <arrow/type.h>
+#include <cudf/ast/expressions.hpp>
+#include <memory>
+#include <vector>
+#include "src/plan/plan.hpp"
+
+class SplitTablePlan : public Plan {
+    public:
+    SplitTablePlan(std::shared_ptr<Plan> input, size_t batch_size);
+
+    public:
+    std::shared_ptr<arrow::Schema> schema() const;
+    arrow::Result<std::unique_ptr<TableStream>> execute() const;
+    void desc(std::ostream& oss) const;
+
+    inline std::vector<std::shared_ptr<Plan>> children() const {
+        return std::vector{input};
+    }
+
+    private:
+    std::shared_ptr<Plan> input;
+    size_t batch_size;
+};
+
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/util.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/util.hpp
new file mode 100644
index 0000000..1d4d374
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/util.hpp
@@ -0,0 +1,25 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#define ARROW_TRY_RESULT(result)           \
+    ({                                     \
+        auto __result = std::move(result); \
+        ARROW_RETURN_NOT_OK(__result);     \
+        __result.MoveValueUnsafe();        \
+    })
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg.json b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg.json
new file mode 100644
index 0000000..a50ed56
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg.json
@@ -0,0 +1,10 @@
+{
+  "name": "blaze-cudf-bridge",
+  "version": "0.1.0",
+  "dependencies": [
+    "arrow",
+    "cuda",
+    "libcudf",
+    "spdlog"
+  ]
+}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/portfile.cmake b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/portfile.cmake
new file mode 100644
index 0000000..cefdccb
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/portfile.cmake
@@ -0,0 +1,49 @@
+vcpkg_from_github(
+    OUT_SOURCE_PATH SOURCE_PATH
+    REPO rapidsai/cudf
+    REF "branch-25.06"
+    SHA512 49fb54e301e070308d2d3be933c61b249b8e67073dbe36db597950aaff84f61d14038ece830e0d61e322d415547b45cb11b37155fd6606f7c5edf1667363c2e4
+    HEAD_REF main
+)
+
+# check for nvcc
+execute_process(
+    COMMAND nvcc --version
+    OUTPUT_VARIABLE NVCC_VERSION_OUTPUT
+    ERROR_VARIABLE NVCC_VERSION_ERROR
+    RESULT_VARIABLE NVCC_VERSION_RESULT
+)
+if(NOT ${NVCC_VERSION_RESULT} EQUAL 0)
+    message(FATAL_ERROR "execute nvcc --version failed: ${NVCC_VERSION_ERROR}")
+endif()
+
+set(BUILD_ARGS --disable_large_strings --cmake-args="-DCUDA_STATIC_RUNTIME=OFF -DBUILD_SHARED_LIBS=OFF")
+set(ENV{INSTALL_PREFIX} "${CURRENT_PACKAGES_DIR}")
+
+# build
+vcpkg_execute_required_process(
+    COMMAND "${SOURCE_PATH}/build.sh" libcudf ${BUILD_ARGS} -n
+    WORKING_DIRECTORY "${SOURCE_PATH}"
+    LOGNAME "build-${TARGET_TRIPLET}"
+)
+
+# patch for installation
+vcpkg_execute_required_process(
+  COMMAND sed -i "230,231s/^/#/" "./cpp/build/_deps/rapids-cmake-src/rapids-cmake/cpm/nvcomp.cmake"
+  WORKING_DIRECTORY "${SOURCE_PATH}"
+  LOGNAME "build-${TARGET_TRIPLET}"
+)
+file(MAKE_DIRECTORY "${CURRENT_PACKAGES_DIR}/usr/lib64")
+
+# install
+vcpkg_execute_required_process(
+    COMMAND "${SOURCE_PATH}/build.sh" libcudf ${BUILD_ARGS}
+    WORKING_DIRECTORY "${SOURCE_PATH}"
+    LOGNAME "build-${TARGET_TRIPLET}"
+)
+
+# fix some file conflicts
+file(RENAME "${CURRENT_PACKAGES_DIR}/include/zdict.h" "${CURRENT_PACKAGES_DIR}/include/cudf-zdict.h")
+file(RENAME "${CURRENT_PACKAGES_DIR}/include/zstd.h" "${CURRENT_PACKAGES_DIR}/include/cudf-zstd.h")
+file(RENAME "${CURRENT_PACKAGES_DIR}/include/zstd_errors.h" "${CURRENT_PACKAGES_DIR}/include/cudf-zstd_errors.h")
+
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/vcpkg.json b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/vcpkg.json
new file mode 100644
index 0000000..2115af5
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/vcpkg.json
@@ -0,0 +1,12 @@
+{
+  "name": "libcudf",
+  "version": "25.6",
+  "description": "GPU DataFrame Library for Apache Arrow",
+  "homepage": "https://github.com/rapidsai/cudf",
+  "license": "Apache-2.0",
+  "dependencies": [
+    "vcpkg-cmake",
+    "cuda"
+  ],
+  "supports": "linux | windows | osx"
+}
diff --git a/native-engine/blaze-cudf-bridge/build.rs b/native-engine/blaze-cudf-bridge/build.rs
new file mode 100644
index 0000000..e50b2d1
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/build.rs
@@ -0,0 +1,7 @@
+fn main() {
+    let lib_path = "native-engine/blaze-cudf-bridge/bridge-cpp/build";
+    println!("cargo::rustc-link-search=native={lib_path}");
+    println!("cargo::rustc-link-arg=-Wl,-rpath,$ORIGIN/{lib_path}");
+    println!("cargo::rustc-link-lib=dylib=blaze-cudf-bridge");
+    println!("cargo::rerun-if-changed={lib_path}");
+}
diff --git a/native-engine/blaze-cudf-bridge/src/exprs.rs b/native-engine/blaze-cudf-bridge/src/exprs.rs
new file mode 100644
index 0000000..a6a7772
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/src/exprs.rs
@@ -0,0 +1,112 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::TypeId, ffi::CString, str::FromStr};
+
+use num::ToPrimitive;
+
+use crate::{ffi::*, AsRawPtrAddr};
+
+pub type CudfExprOperator = FFI_AstOperator;
+
+pub struct CudfExprBuilder {
+    expr_builder: FFI_ExprBuilderPtr,
+}
+
+impl AsRawPtrAddr for CudfExprBuilder {
+    fn as_raw_ptr_addr(&self) -> usize {
+        self.expr_builder.as_raw_ptr_addr()
+    }
+}
+
+impl CudfExprBuilder {
+    pub fn new() -> Self {
+        Self {
+            expr_builder: unsafe { ffi_new_expr_builder() },
+        }
+    }
+
+    pub fn add_literal(&self, scalar: &CudfScalar) -> usize {
+        unsafe { ffi_expr_builder_add_literal(self.as_raw_ptr_addr(), scalar.as_raw_ptr_addr()) }
+    }
+
+    pub fn add_column_ref(&self, column_idx: usize) -> usize {
+        unsafe { ffi_expr_builder_add_column_reference(self.as_raw_ptr_addr(), column_idx) }
+    }
+
+    pub fn add_column_name_ref(&self, name: &str) -> usize {
+        let name_c_str = CString::from_str(name).unwrap();
+        unsafe {
+            ffi_expr_builder_add_column_name_reference(self.as_raw_ptr_addr(), name_c_str.as_ptr())
+        }
+    }
+
+    pub fn add_unary(&self, arg1_expr_id: usize, op: CudfExprOperator) -> usize {
+        unsafe { ffi_expr_builder_add_unary_expr(self.as_raw_ptr_addr(), arg1_expr_id, op) }
+    }
+
+    pub fn add_binary(
+        &self,
+        arg1_expr_id: usize,
+        arg2_expr_id: usize,
+        op: CudfExprOperator,
+    ) -> usize {
+        unsafe {
+            ffi_expr_builder_add_binary_expr(self.as_raw_ptr_addr(), arg1_expr_id, arg2_expr_id, op)
+        }
+    }
+}
+
+pub struct CudfScalar {
+    scalar: FFI_ScalarPtr,
+}
+
+impl AsRawPtrAddr for CudfScalar {
+    fn as_raw_ptr_addr(&self) -> usize {
+        self.scalar.as_raw_ptr_addr()
+    }
+}
+
+impl CudfScalar {
+    pub fn new_prim<T: Copy + ToPrimitive + 'static>(value: T) -> Self {
+        let typeid = TypeId::of::<T>();
+        let scalar = unsafe {
+            match () {
+                _ if typeid == TypeId::of::<i8>() => ffi_i8_scalar(value.to_i8().unwrap()),
+                _ if typeid == TypeId::of::<i16>() => ffi_i16_scalar(value.to_i16().unwrap()),
+                _ if typeid == TypeId::of::<i32>() => ffi_i32_scalar(value.to_i32().unwrap()),
+                _ if typeid == TypeId::of::<i64>() => ffi_i64_scalar(value.to_i64().unwrap()),
+                _ if typeid == TypeId::of::<u8>() => ffi_u8_scalar(value.to_u8().unwrap()),
+                _ if typeid == TypeId::of::<u16>() => ffi_u16_scalar(value.to_u16().unwrap()),
+                _ if typeid == TypeId::of::<u32>() => ffi_u32_scalar(value.to_u32().unwrap()),
+                _ if typeid == TypeId::of::<u64>() => ffi_u64_scalar(value.to_u64().unwrap()),
+                _ if typeid == TypeId::of::<f32>() => ffi_f32_scalar(value.to_f32().unwrap()),
+                _ if typeid == TypeId::of::<f64>() => ffi_f64_scalar(value.to_f64().unwrap()),
+                _ => unreachable!("unimplemented fixed withd scalar"),
+            }
+        };
+        Self { scalar }
+    }
+
+    pub fn new_string(value: &str) -> Self {
+        let c_str = CString::from_str(value).unwrap();
+        let scalar = unsafe { ffi_string_scalar(c_str.as_ptr()) };
+        Self { scalar }
+    }
+
+    pub fn new_bool(value: bool) -> Self {
+        let scalar = unsafe { ffi_i8_scalar(if value { 1 } else { 0 }) };
+        Self { scalar }
+    }
+}
diff --git a/native-engine/blaze-cudf-bridge/src/ffi.rs b/native-engine/blaze-cudf-bridge/src/ffi.rs
new file mode 100644
index 0000000..6e06ef8
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/src/ffi.rs
@@ -0,0 +1,256 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+use paste::paste;
+
+use crate::io::CudfDataSource;
+
+macro_rules! define_ffi_obj_wrapped_type {
+    ($tyname:ident) => {
+        #[repr(C, align(8))]
+        #[allow(non_camel_case_types)]
+        pub struct $tyname {
+            raw_ptr_addr: usize,
+        }
+
+        impl $tyname {
+            #[allow(unused)]
+            pub fn as_raw_ptr_addr(&self) -> usize {
+                return self.raw_ptr_addr;
+            }
+        }
+
+        impl Drop for $tyname {
+            fn drop(&mut self) {
+                unsafe {
+                    paste! {
+                        extern "C" {
+                            fn [<ffi_ $tyname _delete>] (raw_ptr_addr: usize) -> ();
+                        }
+                        [<ffi_ $tyname _delete>] (self.raw_ptr_addr);
+                    }
+                }
+            }
+        }
+    };
+}
+
+macro_rules! define_ffi_result_type {
+    ($tyname:ident, $t:ty) => {
+        #[repr(C, align(4))]
+        #[allow(non_camel_case_types)]
+        pub struct $tyname {
+            pub ok: bool,
+            pub value: $t,
+        }
+
+        impl $tyname {
+            #[allow(unused)]
+            pub fn into_result(self, errmsg: String) -> arrow::error::Result<$t> {
+                if !self.ok {
+                    return Err(arrow::error::ArrowError::ComputeError(errmsg));
+                }
+                Ok(self.value)
+            }
+        }
+    };
+}
+
+define_ffi_obj_wrapped_type!(FFI_PlanPtr);
+define_ffi_obj_wrapped_type!(FFI_TableStreamPtr);
+define_ffi_obj_wrapped_type!(FFI_TablePtr);
+define_ffi_obj_wrapped_type!(FFI_ScalarPtr);
+define_ffi_obj_wrapped_type!(FFI_ExprBuilderPtr);
+
+define_ffi_result_type!(FFI_VoidResult, ());
+define_ffi_result_type!(FFI_BoolResult, bool);
+define_ffi_result_type!(FFI_TableStreamPtrResult, FFI_TableStreamPtr);
+define_ffi_result_type!(FFI_TableResult, FFI_TablePtr);
+define_ffi_result_type!(FFI_USizeResult, usize);
+
+pub fn slice_to_const_ptr_and_len<T>(vec: &[T]) -> (*const T, usize) {
+    let len = vec.len();
+    let ptr = vec.as_ptr();
+    (ptr, len)
+}
+
+pub fn slice_to_mut_ptr_and_len<T>(vec: &mut [T]) -> (*mut T, usize) {
+    let len = vec.len();
+    let ptr = vec.as_mut_ptr();
+    (ptr, len)
+}
+
+#[allow(unused)]
+extern "C" {
+    pub fn ffi_new_parquet_scan_plan(
+        data_source: *mut FFI_CudfDataSource,
+        predicate_filter_expr_ptr_addr: usize,
+        read_offset: usize,
+        read_size: usize,
+        schema: *mut FFI_ArrowSchema,
+    ) -> FFI_PlanPtr;
+
+    pub fn ffi_new_split_table_plan(input_plan_ptr_addr: usize, batch_size: usize) -> FFI_PlanPtr;
+
+    pub fn ffi_new_union_plan(
+        input_plan_ptr_addrs: *const usize,
+        input_plan_len: usize,
+        schema: *mut FFI_ArrowSchema,
+    ) -> FFI_PlanPtr;
+
+    pub fn ffi_new_project_plan(
+        input_plan_ptr_addr: usize,
+        exprs_ptr_addrs: *const usize,
+        exprs_len: usize,
+        schema: *mut FFI_ArrowSchema,
+    ) -> FFI_PlanPtr;
+
+    pub fn ffi_new_filter_plan(input_plan_ptr_addr: usize, expr_ptr_addr: usize) -> FFI_PlanPtr;
+
+    pub fn ffi_execute_plan(plan_ptr_addr: usize) -> FFI_TableStreamPtrResult;
+
+    pub fn ffi_table_stream_has_next(table_stream_ptr_addr: usize) -> FFI_BoolResult;
+
+    pub fn ffi_table_stream_next(table_stream_ptr_addr: usize) -> FFI_TableResult;
+
+    pub fn ffi_table_to_arrow(
+        table_ptr_addr: usize,
+        ffi_schema: *mut FFI_ArrowSchema,
+        ffi_array: *mut FFI_ArrowArray,
+    ) -> FFI_VoidResult;
+
+    pub fn ffi_new_expr_builder() -> FFI_ExprBuilderPtr;
+
+    pub fn ffi_i8_scalar(value: i8) -> FFI_ScalarPtr;
+    pub fn ffi_i16_scalar(value: i16) -> FFI_ScalarPtr;
+    pub fn ffi_i32_scalar(value: i32) -> FFI_ScalarPtr;
+    pub fn ffi_i64_scalar(value: i64) -> FFI_ScalarPtr;
+    pub fn ffi_u8_scalar(value: u8) -> FFI_ScalarPtr;
+    pub fn ffi_u16_scalar(value: u16) -> FFI_ScalarPtr;
+    pub fn ffi_u32_scalar(value: u32) -> FFI_ScalarPtr;
+    pub fn ffi_u64_scalar(value: u64) -> FFI_ScalarPtr;
+    pub fn ffi_f32_scalar(value: f32) -> FFI_ScalarPtr;
+    pub fn ffi_f64_scalar(value: f64) -> FFI_ScalarPtr;
+    pub fn ffi_string_scalar(value: *const std::ffi::c_char) -> FFI_ScalarPtr;
+
+    pub fn ffi_expr_builder_add_literal(
+        expr_builder_ptr_addr: usize,
+        scalar_ptr_addr: usize,
+    ) -> usize;
+    pub fn ffi_expr_builder_add_column_reference(
+        expr_builder_ptr_addr: usize,
+        column_idx: usize,
+    ) -> usize;
+    pub fn ffi_expr_builder_add_column_name_reference(
+        expr_builder_ptr_addr: usize,
+        name: *const std::ffi::c_char,
+    ) -> usize;
+    pub fn ffi_expr_builder_add_unary_expr(
+        expr_builder_ptr_addr: usize,
+        arg1_expr_id: usize,
+        op: FFI_AstOperator,
+    ) -> usize;
+    pub fn ffi_expr_builder_add_binary_expr(
+        expr_builder_ptr_addr: usize,
+        arg1_expr_id: usize,
+        arg2_expr_id: usize,
+        op: FFI_AstOperator,
+    ) -> usize;
+}
+
+#[repr(i32)]
+#[allow(non_camel_case_types)]
+#[allow(unused)]
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum FFI_AstOperator {
+    // this enum is copied from <cudf/ast/expressions.hpp>
+    // Binary operators
+    ADD = 0,   // < operator +
+    SUB,       // < operator -
+    MUL,       // < operator *
+    DIV,       // < operator / using common type of lhs and rhs
+    TRUE_DIV,  // < operator / after promoting type to floating point
+    FLOOR_DIV, // < operator / after promoting to 64 bit floating point and then
+    // < flooring the result
+    MOD,   // < operator %
+    PYMOD, // < operator % using Python's sign rules for negatives
+    POW,   // < lhs ^ rhs
+    EQUAL, // < operator ==
+    NULL_EQUAL, /* < operator == with Spark rules: NULL_EQUAL(null, null) is true,
+            * NULL_EQUAL(null, */
+    // < valid) is false, and
+    // < NULL_EQUAL(valid, valid) == EQUAL(valid, valid)
+    NOT_EQUAL,        // < operator !=
+    LESS,             // < operator <
+    GREATER,          // < operator >
+    LESS_EQUAL,       // < operator <=
+    GREATER_EQUAL,    // < operator >=
+    BITWISE_AND,      // < operator &
+    BITWISE_OR,       // < operator |
+    BITWISE_XOR,      // < operator ^
+    LOGICAL_AND,      // < operator &&
+    NULL_LOGICAL_AND, // < operator && with Spark rules: NULL_LOGICAL_AND(null, null) is null,
+    // < NULL_LOGICAL_AND(null, true) is
+    // < null, NULL_LOGICAL_AND(null, false) is false, and NULL_LOGICAL_AND(valid,
+    // < valid) == LOGICAL_AND(valid, valid)
+    LOGICAL_OR,      // < operator ||
+    NULL_LOGICAL_OR, // < operator || with Spark rules: NULL_LOGICAL_OR(null, null) is null,
+    // < NULL_LOGICAL_OR(null, true) is true,
+    // < NULL_LOGICAL_OR(null, false) is null, and NULL_LOGICAL_OR(valid, valid) ==
+    // < LOGICAL_OR(valid, valid)
+    // Unary operators
+    IDENTITY,        // < Identity function
+    IS_NULL,         // < Check if operand is null
+    SIN,             // < Trigonometric sine
+    COS,             // < Trigonometric cosine
+    TAN,             // < Trigonometric tangent
+    ARCSIN,          // < Trigonometric sine inverse
+    ARCCOS,          // < Trigonometric cosine inverse
+    ARCTAN,          // < Trigonometric tangent inverse
+    SINH,            // < Hyperbolic sine
+    COSH,            // < Hyperbolic cosine
+    TANH,            // < Hyperbolic tangent
+    ARCSINH,         // < Hyperbolic sine inverse
+    ARCCOSH,         // < Hyperbolic cosine inverse
+    ARCTANH,         // < Hyperbolic tangent inverse
+    EXP,             // < Exponential (base e, Euler number)
+    LOG,             // < Natural Logarithm (base e)
+    SQRT,            // < Square-root (x^0.5)
+    CBRT,            // < Cube-root (x^(1.0/3))
+    CEIL,            // < Smallest integer value not less than arg
+    FLOOR,           // < largest integer value not greater than arg
+    ABS,             // < Absolute value
+    RINT,            // < Rounds the floating-point argument arg to an integer value
+    BIT_INVERT,      // < Bitwise Not (~)
+    NOT,             // < Logical Not (!)
+    CAST_TO_INT64,   // < Cast value to int64_t
+    CAST_TO_UINT64,  // < Cast value to uint64_t
+    CAST_TO_FLOAT64, // < Cast value to double
+}
+
+#[repr(C, align(8))]
+#[allow(non_camel_case_types)]
+pub struct FFI_CudfDataSource {
+    pub dtor: extern "C" fn(this: *mut FFI_CudfDataSource) -> (),
+    pub data_size: extern "C" fn(this: *const FFI_CudfDataSource) -> usize,
+    pub read: extern "C" fn(
+        this: *mut FFI_CudfDataSource,
+        offset: usize,
+        buf: *mut u8,
+        buf_size: usize,
+    ) -> FFI_USizeResult,
+
+    pub raw: *mut Box<dyn CudfDataSource>,
+}
diff --git a/native-engine/blaze-cudf-bridge/src/io.rs b/native-engine/blaze-cudf-bridge/src/io.rs
new file mode 100644
index 0000000..0dde996
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/src/io.rs
@@ -0,0 +1,115 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{
+    fs::File,
+    io::{Read, Seek},
+};
+
+use arrow::error::Result;
+use parquet::file::reader::Length;
+
+use crate::ffi::{FFI_CudfDataSource, FFI_USizeResult};
+
+pub trait CudfDataSource {
+    fn data_size(&self) -> usize;
+    fn read(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize>;
+}
+
+pub trait IntoFFICudfDataSource {
+    fn into_ffi(self) -> FFI_CudfDataSource;
+}
+
+impl IntoFFICudfDataSource for Box<dyn CudfDataSource> {
+    fn into_ffi(self) -> FFI_CudfDataSource {
+        extern "C" fn dtor(this: *mut FFI_CudfDataSource) {
+            unsafe {
+                let this_mut = this.as_mut().expect("FFI_CudfDataSource is null");
+                let raw_mut = this_mut
+                    .raw
+                    .as_mut()
+                    .expect("FFI_CudfDataSource.raw is null");
+                let _ = Box::from_raw(raw_mut as *mut Box<dyn CudfDataSource>);
+            }
+        }
+
+        extern "C" fn data_size(this: *const FFI_CudfDataSource) -> usize {
+            unsafe {
+                let this_ref = this.as_ref().expect("FFI_CudfDataSource is null");
+                let raw_ref = this_ref
+                    .raw
+                    .as_ref()
+                    .expect("FFI_CudfDataSource.raw is null");
+                raw_ref.data_size()
+            }
+        }
+
+        extern "C" fn read(
+            this: *mut FFI_CudfDataSource,
+            offset: usize,
+            buf: *mut u8,
+            buf_size: usize,
+        ) -> FFI_USizeResult {
+            unsafe {
+                let this_mut = this.as_mut().expect("FFI_CudfDataSource is null");
+                let raw_mut = this_mut
+                    .raw
+                    .as_mut()
+                    .expect("FFI_CudfDataSource.raw is null");
+                let buf_mut = std::slice::from_raw_parts_mut(buf, buf_size);
+                match raw_mut.read(offset, buf_mut) {
+                    Ok(read_size) => FFI_USizeResult {
+                        ok: true,
+                        value: read_size,
+                    },
+                    Err(_err) => FFI_USizeResult {
+                        ok: false,
+                        value: 0,
+                    },
+                }
+            }
+        }
+
+        FFI_CudfDataSource {
+            dtor,
+            data_size,
+            read,
+            raw: Box::into_raw(Box::new(self)),
+        }
+    }
+}
+
+pub struct CudfLocalFileDataSource {
+    file_path: String,
+    file: File,
+}
+
+impl CudfLocalFileDataSource {
+    pub fn try_new(file_path: String) -> Result<Self> {
+        let file = File::open(&file_path)?;
+        Ok(Self { file_path, file })
+    }
+}
+
+impl CudfDataSource for CudfLocalFileDataSource {
+    fn data_size(&self) -> usize {
+        self.file.len() as usize
+    }
+
+    fn read(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize> {
+        self.file.seek(std::io::SeekFrom::Start(offset as u64))?;
+        let read_size = self.file.read(buf)?;
+        Ok(read_size)
+    }
+}
diff --git a/native-engine/blaze-cudf-bridge/src/lib.rs b/native-engine/blaze-cudf-bridge/src/lib.rs
new file mode 100644
index 0000000..28f4812
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/src/lib.rs
@@ -0,0 +1,25 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod exprs;
+pub mod io;
+pub mod plans;
+pub mod tables;
+
+#[allow(unused)]
+mod ffi;
+
+pub trait AsRawPtrAddr {
+    fn as_raw_ptr_addr(&self) -> usize;
+}
diff --git a/native-engine/blaze-cudf-bridge/src/plans/filter.rs b/native-engine/blaze-cudf-bridge/src/plans/filter.rs
new file mode 100644
index 0000000..7653dd1
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/src/plans/filter.rs
@@ -0,0 +1,39 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use arrow::error::Result;
+
+use crate::{exprs::CudfExprBuilder, ffi::*, plans::CudfPlan, AsRawPtrAddr};
+
+pub struct CudfFilterPlan {
+    plan: FFI_PlanPtr,
+}
+
+impl AsRawPtrAddr for CudfFilterPlan {
+    fn as_raw_ptr_addr(&self) -> usize {
+        self.plan.as_raw_ptr_addr()
+    }
+}
+
+impl CudfPlan for CudfFilterPlan {}
+
+impl CudfFilterPlan {
+    pub fn try_new(input_plan: Arc<dyn CudfPlan>, expr: &CudfExprBuilder) -> Result<Self> {
+        let plan =
+            unsafe { ffi_new_filter_plan(input_plan.as_raw_ptr_addr(), expr.as_raw_ptr_addr()) };
+        Ok(Self { plan })
+    }
+}
diff --git a/native-engine/blaze-cudf-bridge/src/plans/mod.rs b/native-engine/blaze-cudf-bridge/src/plans/mod.rs
new file mode 100644
index 0000000..175ebc8
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/src/plans/mod.rs
@@ -0,0 +1,36 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use arrow::error::Result;
+
+use crate::{ffi::*, tables::CudfTableStream, AsRawPtrAddr};
+
+pub mod filter;
+pub mod parquet_scan;
+pub mod project;
+pub mod split_table;
+pub mod union;
+
+pub type CudfPlanRef = Arc<dyn CudfPlan>;
+
+pub trait CudfPlan: AsRawPtrAddr + Send + Sync + 'static {
+    fn execute(&self) -> Result<CudfTableStream> {
+        let raw_table_stream_result = unsafe { ffi_execute_plan(self.as_raw_ptr_addr()) };
+        let raw_table_stream =
+            raw_table_stream_result.into_result(format!("error execute plan"))?;
+        Ok(CudfTableStream::from_raw_table_stream(raw_table_stream))
+    }
+}
diff --git a/native-engine/blaze-cudf-bridge/src/plans/parquet_scan.rs b/native-engine/blaze-cudf-bridge/src/plans/parquet_scan.rs
new file mode 100644
index 0000000..b52985f
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/src/plans/parquet_scan.rs
@@ -0,0 +1,62 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use arrow::{
+    datatypes::{DataType, SchemaRef},
+    error::Result,
+    ffi::FFI_ArrowSchema,
+};
+
+use crate::{
+    exprs::CudfExprBuilder,
+    ffi::*,
+    io::{CudfDataSource, IntoFFICudfDataSource},
+    plans::CudfPlan,
+    AsRawPtrAddr,
+};
+
+pub struct CudfParquetScanPlan {
+    plan: FFI_PlanPtr,
+}
+
+impl AsRawPtrAddr for CudfParquetScanPlan {
+    fn as_raw_ptr_addr(&self) -> usize {
+        self.plan.as_raw_ptr_addr()
+    }
+}
+
+impl CudfPlan for CudfParquetScanPlan {}
+
+impl CudfParquetScanPlan {
+    pub fn try_new(
+        data_source: Box<dyn CudfDataSource>,
+        predicate_filter_expr: &CudfExprBuilder,
+        read_offset: usize,
+        read_size: usize,
+        schema: SchemaRef,
+    ) -> Result<Self> {
+        let schema_type = DataType::Struct(schema.fields().clone());
+        let mut ffi_schema = FFI_ArrowSchema::try_from(schema_type)?;
+        let plan = unsafe {
+            ffi_new_parquet_scan_plan(
+                &mut data_source.into_ffi(),
+                predicate_filter_expr.as_raw_ptr_addr(),
+                read_offset,
+                read_size,
+                &mut ffi_schema,
+            )
+        };
+        Ok(Self { plan })
+    }
+}
diff --git a/native-engine/blaze-cudf-bridge/src/plans/project.rs b/native-engine/blaze-cudf-bridge/src/plans/project.rs
new file mode 100644
index 0000000..a6ded26
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/src/plans/project.rs
@@ -0,0 +1,62 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use arrow::{
+    datatypes::{DataType, SchemaRef},
+    error::Result,
+    ffi::FFI_ArrowSchema,
+};
+
+use crate::{exprs::CudfExprBuilder, ffi::*, plans::CudfPlan, AsRawPtrAddr};
+
+pub struct CudfProjectPlan {
+    plan: FFI_PlanPtr,
+}
+
+impl AsRawPtrAddr for CudfProjectPlan {
+    fn as_raw_ptr_addr(&self) -> usize {
+        self.plan.as_raw_ptr_addr()
+    }
+}
+
+impl CudfPlan for CudfProjectPlan {}
+
+impl CudfProjectPlan {
+    pub fn try_new(
+        input_plan: Arc<dyn CudfPlan>,
+        exprs: &[&CudfExprBuilder],
+        schema: SchemaRef,
+    ) -> Result<Self> {
+        let exprs_ptr_addrs = exprs
+            .iter()
+            .map(|expr| expr.as_raw_ptr_addr())
+            .collect::<Vec<_>>();
+        let (exprs_ptr, exprs_len) = slice_to_const_ptr_and_len(&exprs_ptr_addrs);
+
+        let schema_type = DataType::Struct(schema.fields().clone());
+        let mut ffi_schema = FFI_ArrowSchema::try_from(schema_type)?;
+
+        let plan = unsafe {
+            ffi_new_project_plan(
+                input_plan.as_raw_ptr_addr(),
+                exprs_ptr,
+                exprs_len,
+                &mut ffi_schema,
+            )
+        };
+        Ok(Self { plan })
+    }
+}
diff --git a/native-engine/blaze-cudf-bridge/src/plans/split_table.rs b/native-engine/blaze-cudf-bridge/src/plans/split_table.rs
new file mode 100644
index 0000000..65042a9
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/src/plans/split_table.rs
@@ -0,0 +1,36 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use crate::{ffi::*, plans::CudfPlan, AsRawPtrAddr};
+
+pub struct CudfSplitTablePlan {
+    plan: FFI_PlanPtr,
+}
+
+impl AsRawPtrAddr for CudfSplitTablePlan {
+    fn as_raw_ptr_addr(&self) -> usize {
+        self.plan.as_raw_ptr_addr()
+    }
+}
+
+impl CudfPlan for CudfSplitTablePlan {}
+
+impl CudfSplitTablePlan {
+    pub fn new(input_plan: Arc<dyn CudfPlan>, batch_size: usize) -> Self {
+        let plan = unsafe { ffi_new_split_table_plan(input_plan.as_raw_ptr_addr(), batch_size) };
+        Self { plan }
+    }
+}
diff --git a/native-engine/blaze-cudf-bridge/src/tables.rs b/native-engine/blaze-cudf-bridge/src/tables.rs
new file mode 100644
index 0000000..14abf91
--- /dev/null
+++ b/native-engine/blaze-cudf-bridge/src/tables.rs
@@ -0,0 +1,93 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use arrow::{
+    array::{make_array, AsArray, RecordBatch},
+    datatypes::{DataType, SchemaRef},
+    error::Result,
+    ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema},
+};
+
+use crate::{ffi::*, AsRawPtrAddr};
+
+pub struct CudfTableStream {
+    table_stream: FFI_TableStreamPtr,
+}
+
+impl AsRawPtrAddr for CudfTableStream {
+    fn as_raw_ptr_addr(&self) -> usize {
+        self.table_stream.as_raw_ptr_addr()
+    }
+}
+
+impl CudfTableStream {
+    pub fn from_raw_table_stream(table_stream: FFI_TableStreamPtr) -> Self {
+        Self { table_stream }
+    }
+
+    fn next_impl(&mut self) -> Result<Option<CudfTable>> {
+        let has_next = unsafe {
+            ffi_table_stream_has_next(self.as_raw_ptr_addr())
+                .into_result(format!("error invoking table_stream.has_next()"))?
+        };
+        if has_next {
+            let raw_table = unsafe {
+                ffi_table_stream_next(self.as_raw_ptr_addr())
+                    .into_result(format!("error invoking table_stream.next()"))?
+            };
+            return Ok(Some(CudfTable::from_raw_table(raw_table)));
+        }
+        Ok(None)
+    }
+}
+
+impl Iterator for CudfTableStream {
+    type Item = Result<CudfTable>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.next_impl().transpose()
+    }
+}
+
+pub struct CudfTable {
+    table: FFI_TablePtr,
+}
+
+impl AsRawPtrAddr for CudfTable {
+    fn as_raw_ptr_addr(&self) -> usize {
+        self.table.as_raw_ptr_addr()
+    }
+}
+
+impl CudfTable {
+    pub fn from_raw_table(table: FFI_TablePtr) -> Self {
+        Self { table }
+    }
+
+    pub fn to_arrow_record_batch(&self, schema: SchemaRef) -> Result<RecordBatch> {
+        let schema_type = DataType::Struct(schema.fields().clone());
+        let mut ffi_schema = FFI_ArrowSchema::try_from(schema_type.clone())?;
+        let mut ffi_array = FFI_ArrowArray::empty();
+        let array_data = unsafe {
+            ffi_table_to_arrow(self.as_raw_ptr_addr(), &mut ffi_schema, &mut ffi_array)
+                .into_result(format!(
+                    "error converting cudf::table to arrow record batch"
+                ))?;
+            from_ffi_and_data_type(ffi_array, schema_type)?
+        };
+        Ok(RecordBatch::from(
+            make_array(array_data).as_struct().clone(),
+        ))
+    }
+}
diff --git a/native-engine/blaze-jni-bridge/src/conf.rs b/native-engine/blaze-jni-bridge/src/conf.rs
index a00a18e..e695d14 100644
--- a/native-engine/blaze-jni-bridge/src/conf.rs
+++ b/native-engine/blaze-jni-bridge/src/conf.rs
@@ -51,6 +51,7 @@
 define_conf!(IntConf, SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE);
 define_conf!(BooleanConf, ORC_FORCE_POSITIONAL_EVOLUTION);
 define_conf!(IntConf, UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG);
+define_conf!(BooleanConf, ENABLE_CUDA);
 
 pub trait BooleanConf {
     fn key(&self) -> &'static str;
diff --git a/native-engine/datafusion-ext-commons/src/hadoop_fs.rs b/native-engine/datafusion-ext-commons/src/hadoop_fs.rs
index fe6353a..5401ac0 100644
--- a/native-engine/datafusion-ext-commons/src/hadoop_fs.rs
+++ b/native-engine/datafusion-ext-commons/src/hadoop_fs.rs
@@ -39,6 +39,8 @@
 
     pub fn mkdirs(&self, path: &str) -> Result<()> {
         let _timer = self.io_time.timer();
+        log::info!("hdfs mkdirs: {path}");
+
         let path_str = jni_new_string!(path)?;
         let path_uri = jni_new_object!(JavaURI(path_str.as_obj()))?;
         let path = jni_new_object!(HadoopPath(path_uri.as_obj()))?;
@@ -53,6 +55,8 @@
 
     pub fn open(&self, path: &str) -> Result<Arc<FsDataInputWrapper>> {
         let _timer = self.io_time.timer();
+        log::info!("hdfs open: {path}");
+
         let path = jni_new_string!(path)?;
         let wrapper = jni_call_static!(
             JniBridge.openFileAsDataInputWrapper(self.fs.as_obj(), path.as_obj()) -> JObject
@@ -66,6 +70,8 @@
 
     pub fn create(&self, path: &str) -> Result<Arc<FsDataOutputWrapper>> {
         let _timer = self.io_time.timer();
+        log::info!("hdfs create: {path}");
+
         let path = jni_new_string!(path)?;
         let wrapper = jni_call_static!(
             JniBridge.createFileAsDataOutputWrapper(self.fs.as_obj(), path.as_obj()) -> JObject
diff --git a/native-engine/datafusion-ext-plans/Cargo.toml b/native-engine/datafusion-ext-plans/Cargo.toml
index 8ae69f5..b65fb91 100644
--- a/native-engine/datafusion-ext-plans/Cargo.toml
+++ b/native-engine/datafusion-ext-plans/Cargo.toml
@@ -11,6 +11,7 @@
 arrow = { workspace = true }
 arrow-schema = { workspace = true }
 blaze-jni-bridge = { workspace = true }
+blaze-cudf-bridge = { workspace = true }
 datafusion = { workspace = true }
 datafusion-ext-commons = { workspace = true }
 datafusion-ext-exprs = { workspace = true }
diff --git a/native-engine/datafusion-ext-plans/src/common/execution_context.rs b/native-engine/datafusion-ext-plans/src/common/execution_context.rs
index ccb6076..772b130 100644
--- a/native-engine/datafusion-ext-plans/src/common/execution_context.rs
+++ b/native-engine/datafusion-ext-plans/src/common/execution_context.rs
@@ -22,7 +22,11 @@
 };
 
 use arrow::{array::RecordBatch, datatypes::SchemaRef};
-use blaze_jni_bridge::{conf, conf::BooleanConf, is_task_running};
+use blaze_cudf_bridge::plans::{split_table::CudfSplitTablePlan, CudfPlan};
+use blaze_jni_bridge::{
+    conf::{self, BooleanConf},
+    is_task_running,
+};
 use datafusion::{
     common::Result,
     execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
@@ -44,6 +48,7 @@
 
 use crate::{
     common::{column_pruning::ExecuteWithColumnPruning, timer_helper::TimerHelper},
+    cudf::plan::convert_datafusion_plan_to_cudf,
     memmgr::metrics::SpillMetrics,
 };
 
@@ -240,6 +245,49 @@
         input.execute_projected(self.partition_id, self.task_ctx.clone(), projection)
     }
 
+    pub fn execute_with_cudf(
+        self: &Arc<Self>,
+        plan: &dyn ExecutionPlan,
+    ) -> Result<SendableRecordBatchStream> {
+        if !conf::ENABLE_CUDA.value().unwrap_or(false) {
+            return df_execution_err!("blaze CUDA support is not enabled");
+        }
+        let exec_ctx = self.clone();
+        let cudf_elapsed_compute = exec_ctx.register_timer_metric("cudf_elapsed_compute");
+        let cudf_plan = convert_datafusion_plan_to_cudf(plan).inspect_err(|err| {
+            log::info!("convert plan to cudf-bridge error: {err}");
+        })?;
+
+        Ok(exec_ctx
+            .clone()
+            .output_with_sender("CudfStream", move |sender| async move {
+                let _cudf_timer = cudf_elapsed_compute.timer();
+                sender.exclude_time(&cudf_elapsed_compute);
+
+                log::info!("****** executing with Blaze + CUDA (libcudf) ******");
+
+                let cudf_plan: Arc<dyn CudfPlan> =
+                    Arc::new(CudfSplitTablePlan::new(cudf_plan, batch_size()));
+                let mut cudf_table_stream = Box::pin(cudf_plan.execute().inspect_err(|err| {
+                    log::info!("executing cudf-bridge plan error: {err}");
+                })?);
+
+                while let Some(batch) = {
+                    let output_schema = exec_ctx.output_schema();
+                    tokio::task::block_in_place(|| -> Result<Option<RecordBatch>> {
+                        if let Some(cudf_table) = cudf_table_stream.as_mut().next().transpose()? {
+                            return Ok(Some(cudf_table.to_arrow_record_batch(output_schema)?));
+                        }
+                        Ok(None)
+                    })?
+                } {
+                    exec_ctx.baseline_metrics().record_output(batch.num_rows());
+                    sender.send(batch).await;
+                }
+                Ok(())
+            }))
+    }
+
     pub fn stat_input(
         self: &Arc<Self>,
         input: SendableRecordBatchStream,
diff --git a/native-engine/datafusion-ext-plans/src/cudf/expr.rs b/native-engine/datafusion-ext-plans/src/cudf/expr.rs
new file mode 100644
index 0000000..3293b17
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/cudf/expr.rs
@@ -0,0 +1,99 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use blaze_cudf_bridge::exprs::{CudfExprBuilder, CudfExprOperator, CudfScalar};
+use datafusion::{
+    error::Result,
+    logical_expr::Operator,
+    physical_expr::PhysicalExprRef,
+    physical_plan::expressions::{
+        BinaryExpr, Column, IsNotNullExpr, IsNullExpr, Literal, NegativeExpr,
+    },
+    scalar::ScalarValue,
+};
+use datafusion_ext_commons::{df_unimplemented_err, downcast_any};
+
+pub fn convert_datafusion_expr_to_cudf(
+    de: &PhysicalExprRef,
+    use_column_name: bool,
+) -> Result<CudfExprBuilder> {
+    let root_expr_builder = CudfExprBuilder::new();
+    let _ = convert_impl(&de, &root_expr_builder, use_column_name)?;
+    Ok(root_expr_builder)
+}
+
+fn convert_impl(de: &PhysicalExprRef, b: &CudfExprBuilder, use_column_name: bool) -> Result<usize> {
+    if let Ok(lit) = downcast_any!(de, Literal) {
+        let cudf_scalar = match lit.value() {
+            ScalarValue::Boolean(Some(v)) => CudfScalar::new_bool(*v),
+            ScalarValue::Int8(Some(v)) => CudfScalar::new_prim(*v),
+            ScalarValue::Int16(Some(v)) => CudfScalar::new_prim(*v),
+            ScalarValue::Int32(Some(v)) => CudfScalar::new_prim(*v),
+            ScalarValue::Int64(Some(v)) => CudfScalar::new_prim(*v),
+            ScalarValue::UInt8(Some(v)) => CudfScalar::new_prim(*v),
+            ScalarValue::UInt16(Some(v)) => CudfScalar::new_prim(*v),
+            ScalarValue::UInt32(Some(v)) => CudfScalar::new_prim(*v),
+            ScalarValue::UInt64(Some(v)) => CudfScalar::new_prim(*v),
+            ScalarValue::Float32(Some(v)) => CudfScalar::new_prim(*v),
+            ScalarValue::Float64(Some(v)) => CudfScalar::new_prim(*v),
+            ScalarValue::Utf8(Some(v)) => CudfScalar::new_string(v),
+            _ => return df_unimplemented_err!("cannot convert scalar to cudf: {lit:?}"),
+        };
+        return Ok(b.add_literal(&cudf_scalar));
+    } else if let Ok(col) = downcast_any!(de, Column) {
+        if use_column_name {
+            return Ok(b.add_column_name_ref(col.name()));
+        } else {
+            return Ok(b.add_column_ref(col.index()));
+        }
+    } else if let Ok(neg) = downcast_any!(de, NegativeExpr) {
+        let arg_expr_id = convert_impl(neg.arg(), b, use_column_name)?;
+        return Ok(b.add_unary(arg_expr_id, CudfExprOperator::SUB));
+    } else if let Ok(binary) = downcast_any!(de, BinaryExpr) {
+        let left_expr_id = convert_impl(binary.left(), b, use_column_name)?;
+        let right_expr_id = convert_impl(binary.right(), b, use_column_name)?;
+        let op = convert_op(binary.op())?;
+        return Ok(b.add_binary(left_expr_id, right_expr_id, op));
+    } else if let Ok(is_null) = downcast_any!(de, IsNullExpr) {
+        let arg_expr_id = convert_impl(is_null.arg(), b, use_column_name)?;
+        return Ok(b.add_unary(arg_expr_id, CudfExprOperator::IS_NULL));
+    } else if let Ok(is_not_null) = downcast_any!(de, IsNotNullExpr) {
+        let arg_expr_id = convert_impl(is_not_null.arg(), b, use_column_name)?;
+        let is_null_expr_id = b.add_unary(arg_expr_id, CudfExprOperator::IS_NULL);
+        return Ok(b.add_unary(is_null_expr_id, CudfExprOperator::NOT));
+    }
+    df_unimplemented_err!("cannot convert expr to cudf: {de:?}")
+}
+
+fn convert_op(op: &Operator) -> Result<CudfExprOperator> {
+    Ok(match op {
+        Operator::Eq => CudfExprOperator::EQUAL,
+        Operator::NotEq => CudfExprOperator::NOT_EQUAL,
+        Operator::Lt => CudfExprOperator::LESS,
+        Operator::LtEq => CudfExprOperator::LESS_EQUAL,
+        Operator::Gt => CudfExprOperator::GREATER,
+        Operator::GtEq => CudfExprOperator::GREATER_EQUAL,
+        Operator::Plus => CudfExprOperator::ADD,
+        Operator::Minus => CudfExprOperator::SUB,
+        Operator::Multiply => CudfExprOperator::MUL,
+        Operator::Divide => CudfExprOperator::DIV,
+        Operator::Modulo => CudfExprOperator::MOD,
+        Operator::And => CudfExprOperator::LOGICAL_AND,
+        Operator::Or => CudfExprOperator::LOGICAL_OR,
+        Operator::BitwiseAnd => CudfExprOperator::BITWISE_AND,
+        Operator::BitwiseOr => CudfExprOperator::BITWISE_OR,
+        Operator::BitwiseXor => CudfExprOperator::BITWISE_XOR,
+        other => return df_unimplemented_err!("connot convert operator to cudf: {other:?}"),
+    })
+}
diff --git a/native-engine/datafusion-ext-plans/src/cudf/mod.rs b/native-engine/datafusion-ext-plans/src/cudf/mod.rs
new file mode 100644
index 0000000..048bcb3
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/cudf/mod.rs
@@ -0,0 +1,16 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod expr;
+pub mod plan;
diff --git a/native-engine/datafusion-ext-plans/src/cudf/plan.rs b/native-engine/datafusion-ext-plans/src/cudf/plan.rs
new file mode 100644
index 0000000..d015119
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/cudf/plan.rs
@@ -0,0 +1,209 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use blaze_cudf_bridge::{
+    io::CudfDataSource,
+    plans::{
+        filter::CudfFilterPlan, parquet_scan::CudfParquetScanPlan, project::CudfProjectPlan,
+        union::CudfUnionPlan, CudfPlan, CudfPlanRef,
+    },
+};
+use datafusion::{
+    datasource::listing::FileRange,
+    error::Result,
+    logical_expr::Operator,
+    physical_expr::PhysicalExprRef,
+    physical_plan::{
+        expressions::{lit, BinaryExpr, Column, Literal, SCAndExpr},
+        metrics::Time,
+        ExecutionPlan,
+    },
+};
+use datafusion_ext_commons::{df_unimplemented_err, downcast_any};
+
+use super::expr::convert_datafusion_expr_to_cudf;
+use crate::{
+    filter_exec::FilterExec, parquet_exec::ParquetExec, project_exec::ProjectExec,
+    rename_columns_exec::RenameColumnsExec, scan::internal_file_reader::InternalFileReader,
+};
+
+pub fn convert_datafusion_plan_to_cudf(exec: &dyn ExecutionPlan) -> Result<CudfPlanRef> {
+    if let Ok(exec) = downcast_any!(exec, ParquetExec) {
+        return convert_parquet_scan(exec);
+    }
+    if let Ok(exec) = downcast_any!(exec, RenameColumnsExec) {
+        return convert_rename_columns(exec);
+    }
+    if let Ok(exec) = downcast_any!(exec, ProjectExec) {
+        return convert_project(exec);
+    }
+    if let Ok(exec) = downcast_any!(exec, FilterExec) {
+        return convert_filter(exec);
+    }
+    df_unimplemented_err!("cudf not supported for plan: {exec:?}")
+}
+
+fn convert_parquet_scan(exec: &ParquetExec) -> Result<CudfPlanRef> {
+    let io_time = Time::new();
+    let config = exec.file_scan_config();
+    let fs_provider = exec.get_fs_provider(&io_time)?;
+    let mut inputs = vec![];
+
+    for part_file in config.file_groups.iter().flatten() {
+        assert!(
+            part_file.partition_values.is_empty(),
+            "XXX partition tables are not yet supported: {:?}",
+            part_file.partition_values,
+        );
+        let range = part_file.range.clone().unwrap_or(FileRange {
+            start: 0,
+            end: i64::MAX,
+        });
+        let object_meta = part_file.object_meta.clone();
+        inputs.push((object_meta, range));
+    }
+
+    struct InternalFileDataSource {
+        file_reader: InternalFileReader,
+        file_size: usize,
+    }
+    impl CudfDataSource for InternalFileDataSource {
+        fn data_size(&self) -> usize {
+            self.file_size
+        }
+
+        fn read(&mut self, offset: usize, buf: &mut [u8]) -> arrow::error::Result<usize> {
+            let range = offset..offset + buf.len();
+            self.file_reader.read_fully_into_buffer(range, buf)?;
+            Ok(buf.len())
+        }
+    }
+
+    let schema = exec.schema();
+
+    // cudf only supports <column cmp literal> filters
+    let predicate_exprs = split_expr_by_logical_ands(exec.predicate_expr().unwrap_or(lit(true)))
+        .into_iter()
+        .filter(|expr| {
+            matches!(
+            downcast_any!(&expr, BinaryExpr),
+            Ok(expr) if
+                downcast_any!(expr.left(), Column).is_ok() &&
+                downcast_any!(expr.right(), Literal).is_ok())
+        })
+        .collect::<Vec<_>>();
+    let predicate_expr = join_exprs_by_logical_ands(&predicate_exprs);
+    let cudf_predicate_expr = convert_datafusion_expr_to_cudf(&predicate_expr, true)?;
+
+    let mut scan_plans: Vec<Arc<dyn CudfPlan>> = vec![];
+    for input in inputs {
+        let data_source = Box::new(InternalFileDataSource {
+            file_size: input.0.size,
+            file_reader: InternalFileReader::try_new(fs_provider.clone(), input.0.clone())?,
+        });
+        let scan_plan = Arc::new(CudfParquetScanPlan::try_new(
+            data_source,
+            &cudf_predicate_expr,
+            input.1.start as usize,
+            input.1.end as usize - input.1.start as usize,
+            schema.clone(),
+        )?);
+        scan_plans.push(scan_plan);
+    }
+
+    if scan_plans.len() == 1 {
+        return Ok(scan_plans[0].clone());
+    }
+    Ok(Arc::new(CudfUnionPlan::try_new(&scan_plans, schema)?))
+}
+
+fn convert_rename_columns(exec: &RenameColumnsExec) -> Result<CudfPlanRef> {
+    let output_schema = exec.schema();
+    let input_schema = exec.children()[0].schema();
+
+    if output_schema.fields().len() != input_schema.fields.len()
+        || output_schema
+            .fields()
+            .iter()
+            .zip(input_schema.fields())
+            .all(|(f1, f2)| f1.name() == f2.name() && f1.is_nullable() == f2.is_nullable())
+    {
+        return df_unimplemented_err!(
+            "cudf-bridge only supports RenameColumnsExec with same data types"
+        );
+    }
+
+    let input = convert_datafusion_plan_to_cudf(exec.children()[0].as_ref())?;
+    let cudf_project_exprs = (0..output_schema.fields().len())
+        .map(|i| {
+            let col: PhysicalExprRef = Arc::new(Column::new("", i));
+            Ok(convert_datafusion_expr_to_cudf(&col, false)?)
+        })
+        .collect::<Result<Vec<_>>>()?;
+    Ok(Arc::new(CudfProjectPlan::try_new(
+        input,
+        &cudf_project_exprs.iter().collect::<Vec<_>>(),
+        output_schema,
+    )?))
+}
+
+fn convert_project(exec: &ProjectExec) -> Result<CudfPlanRef> {
+    let schema = exec.schema();
+    let cudf_exprs = exec
+        .named_exprs()
+        .iter()
+        .map(|(expr, _name)| convert_datafusion_expr_to_cudf(expr, false))
+        .collect::<Result<Vec<_>>>()?;
+    let cudf_input = convert_datafusion_plan_to_cudf(exec.children()[0].as_ref())?;
+    Ok(Arc::new(CudfProjectPlan::try_new(
+        cudf_input,
+        &cudf_exprs.iter().collect::<Vec<_>>(),
+        schema,
+    )?))
+}
+
+fn convert_filter(exec: &FilterExec) -> Result<CudfPlanRef> {
+    let predicate_expr = join_exprs_by_logical_ands(exec.predicates());
+    let cudf_predicate_expr = convert_datafusion_expr_to_cudf(&predicate_expr, false)?;
+    let cudf_input = convert_datafusion_plan_to_cudf(exec.children()[0].as_ref())?;
+    Ok(Arc::new(CudfFilterPlan::try_new(
+        cudf_input,
+        &cudf_predicate_expr,
+    )?))
+}
+
+fn split_expr_by_logical_ands(expr: PhysicalExprRef) -> Vec<PhysicalExprRef> {
+    if let Ok(expr) = downcast_any!(&expr, SCAndExpr) {
+        let splitted_left = split_expr_by_logical_ands(expr.left.clone());
+        let splitted_right = split_expr_by_logical_ands(expr.right.clone());
+        return [splitted_left, splitted_right].concat();
+    }
+    if let Ok(expr) = downcast_any!(&expr, BinaryExpr) {
+        if expr.op() == &Operator::And {
+            let splitted_left = split_expr_by_logical_ands(expr.left().clone());
+            let splitted_right = split_expr_by_logical_ands(expr.right().clone());
+            return [splitted_left, splitted_right].concat();
+        }
+    }
+    vec![expr]
+}
+
+fn join_exprs_by_logical_ands(expr: &[PhysicalExprRef]) -> PhysicalExprRef {
+    expr.iter()
+        .cloned()
+        .reduce(|pred1, pred2| Arc::new(BinaryExpr::new(pred1, Operator::And, pred2)))
+        .unwrap_or(lit(true))
+}
diff --git a/native-engine/datafusion-ext-plans/src/filter_exec.rs b/native-engine/datafusion-ext-plans/src/filter_exec.rs
index 20cdb4d..18c7f50 100644
--- a/native-engine/datafusion-ext-plans/src/filter_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/filter_exec.rs
@@ -127,8 +127,12 @@
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let predicates = self.predicates.clone();
         let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics);
+        if let Ok(stream) = exec_ctx.execute_with_cudf(self) {
+            return Ok(stream);
+        }
+
+        let predicates = self.predicates.clone();
         let input = exec_ctx.execute_with_input_stats(&self.input)?;
         let filtered = execute_filter(input, predicates, exec_ctx.clone())?;
         Ok(exec_ctx.coalesce_with_default_batch_size(filtered))
diff --git a/native-engine/datafusion-ext-plans/src/lib.rs b/native-engine/datafusion-ext-plans/src/lib.rs
index 505b1d1..8e56ce8 100644
--- a/native-engine/datafusion-ext-plans/src/lib.rs
+++ b/native-engine/datafusion-ext-plans/src/lib.rs
@@ -54,6 +54,9 @@
 pub mod common;
 pub mod generate;
 pub mod joins;
-mod scan;
+pub mod scan;
 pub mod shuffle;
 pub mod window;
+
+// cudf integration
+mod cudf;
diff --git a/native-engine/datafusion-ext-plans/src/parquet_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_exec.rs
index 26864ee..a4e845a 100644
--- a/native-engine/datafusion-ext-plans/src/parquet_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/parquet_exec.rs
@@ -40,7 +40,7 @@
     physical_expr::EquivalenceProperties,
     physical_optimizer::pruning::PruningPredicate,
     physical_plan::{
-        metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+        metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time},
         DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PhysicalExpr,
         PlanProperties, SendableRecordBatchStream, Statistics,
     },
@@ -59,9 +59,10 @@
 };
 
 /// Execution plan for scanning one or more Parquet partitions
-#[derive(Debug, Clone)]
+#[derive(Clone)]
 pub struct ParquetExec {
     fs_resource_id: String,
+    fs_provider: OnceCell<Arc<FsProvider>>,
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
@@ -72,6 +73,22 @@
     props: OnceCell<PlanProperties>,
 }
 
+impl std::fmt::Debug for ParquetExec {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ParquetExec")
+            .field("fs_resource_id", &self.fs_resource_id)
+            .field("base_config", &self.base_config)
+            .field("projected_statistics", &self.projected_statistics)
+            .field("projected_schema", &self.projected_schema)
+            .field("metrics", &self.metrics)
+            .field("predicate", &self.predicate)
+            .field("pruning_predicate", &self.pruning_predicate)
+            .field("page_pruning_predicate", &self.page_pruning_predicate)
+            .field("props", &self.props)
+            .finish()
+    }
+}
+
 impl ParquetExec {
     /// Create a new Parquet reader execution plan provided file list and
     /// schema.
@@ -108,6 +125,7 @@
 
         Self {
             fs_resource_id,
+            fs_provider: OnceCell::new(),
             base_config,
             projected_schema,
             projected_statistics,
@@ -118,6 +136,25 @@
             props: OnceCell::new(),
         }
     }
+
+    pub fn get_fs_provider(&self, io_time: &Time) -> Result<Arc<FsProvider>> {
+        // get fs object from jni bridge resource
+        let fs_provider = self.fs_provider.get_or_try_init(|| {
+            let resource_id = jni_new_string!(&self.fs_resource_id)?;
+            let fs = jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?;
+            let fs_provider = Arc::new(FsProvider::new(jni_new_global_ref!(fs.as_obj())?, io_time));
+            Ok::<_, DataFusionError>(fs_provider)
+        })?;
+        Ok(fs_provider.clone())
+    }
+
+    pub fn file_scan_config(&self) -> &FileScanConfig {
+        &self.base_config
+    }
+
+    pub fn predicate_expr(&self) -> Option<Arc<dyn PhysicalExpr>> {
+        self.predicate.clone()
+    }
 }
 
 impl DisplayAs for ParquetExec {
@@ -184,15 +221,14 @@
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics);
+        if let Ok(stream) = exec_ctx.execute_with_cudf(self) {
+            return Ok(stream);
+        }
+
         let elapsed_compute = exec_ctx.baseline_metrics().elapsed_compute().clone();
         let _timer = elapsed_compute.timer();
         let io_time = exec_ctx.register_timer_metric("io_time");
 
-        // get fs object from jni bridge resource
-        let resource_id = jni_new_string!(&self.fs_resource_id)?;
-        let fs = jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?;
-        let fs_provider = Arc::new(FsProvider::new(jni_new_global_ref!(fs.as_obj())?, &io_time));
-
         let schema_adapter_factory = Arc::new(BlazeSchemaAdapterFactory);
         let projection = match self.base_config.file_column_projection_indices() {
             Some(proj) => proj,
@@ -213,7 +249,9 @@
             table_schema: self.base_config.file_schema.clone(),
             metadata_size_hint: None,
             metrics: self.metrics.clone(),
-            parquet_file_reader_factory: Arc::new(FsReaderFactory::new(fs_provider)),
+            parquet_file_reader_factory: Arc::new(FsReaderFactory::new(
+                self.get_fs_provider(&io_time)?,
+            )),
             pushdown_filters: page_filtering_enabled,
             reorder_filters: page_filtering_enabled,
             enable_page_index: page_filtering_enabled,
diff --git a/native-engine/datafusion-ext-plans/src/project_exec.rs b/native-engine/datafusion-ext-plans/src/project_exec.rs
index 86e96a2..4098b07 100644
--- a/native-engine/datafusion-ext-plans/src/project_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/project_exec.rs
@@ -78,6 +78,10 @@
             props: OnceCell::new(),
         })
     }
+
+    pub fn named_exprs(&self) -> &[(PhysicalExprRef, String)] {
+        &self.expr
+    }
 }
 
 impl DisplayAs for ProjectExec {
@@ -136,8 +140,11 @@
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics);
-        let exprs: Vec<PhysicalExprRef> = self.expr.iter().map(|(e, _name)| e.clone()).collect();
+        if let Ok(stream) = exec_ctx.execute_with_cudf(self) {
+            return Ok(stream);
+        }
 
+        let exprs: Vec<PhysicalExprRef> = self.expr.iter().map(|(e, _name)| e.clone()).collect();
         let output = if let Ok(filter_exec) = downcast_any!(self.input, FilterExec) {
             execute_project_with_filtering(
                 filter_exec.children()[0].clone(),
diff --git a/native-engine/datafusion-ext-plans/src/scan/internal_file_reader.rs b/native-engine/datafusion-ext-plans/src/scan/internal_file_reader.rs
index 507d2ba..c1d7c13 100644
--- a/native-engine/datafusion-ext-plans/src/scan/internal_file_reader.rs
+++ b/native-engine/datafusion-ext-plans/src/scan/internal_file_reader.rs
@@ -61,6 +61,12 @@
         Ok(input.clone())
     }
 
+    pub fn read_fully_into_buffer(&self, range: Range<usize>, buf: &mut [u8]) -> Result<()> {
+        assert_eq!(range.len(), buf.len());
+        self.get_input()?.read_fully(range.start as u64, buf)?;
+        Ok(())
+    }
+
     pub fn read_fully(&self, range: Range<usize>) -> Result<Bytes> {
         let mut bytes = vec![0u8; range.len()];
         self.get_input()?
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/uniffle/BlazeUniffleShuffleReader.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/uniffle/BlazeUniffleShuffleReader.scala
index 79ab98d..4415570 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/uniffle/BlazeUniffleShuffleReader.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/uniffle/BlazeUniffleShuffleReader.scala
@@ -89,12 +89,13 @@
     FieldUtils.readField(reader, "readMetrics", true).asInstanceOf[ShuffleReadMetrics]
 
   override protected def readBlocks(): Iterator[InputStream] = {
-    logInfo(s"Shuffle read started: " +
-      s"appId=$appId" +
-      s", shuffleId=$shuffleId" +
-      s", taskId=$taskId" +
-      s", partitions: [$startPartition, $endPartition)" +
-      s", maps: [$startMapIndex, $endMapIndex)")
+    logInfo(
+      s"Shuffle read started: " +
+        s"appId=$appId" +
+        s", shuffleId=$shuffleId" +
+        s", taskId=$taskId" +
+        s", partitions: [$startPartition, $endPartition)" +
+        s", maps: [$startMapIndex, $endMapIndex)")
 
     val inputStream =
       new UniffleInputStream(
diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java
index f3e324b..47fe0bc 100644
--- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java
+++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java
@@ -103,7 +103,10 @@
     // batches in memory at the same time
     SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE("spark.blaze.suggested.batch.memSize.multiwayMerging", 1048576),
 
-    ORC_FORCE_POSITIONAL_EVOLUTION("spark.blaze.orc.force.positional.evolution", false);
+    ORC_FORCE_POSITIONAL_EVOLUTION("spark.blaze.orc.force.positional.evolution", false),
+
+    // enable CUDA supports (via libcudf)
+    ENABLE_CUDA("spark.blaze.enable.cuda", false);
 
     public final String key;
     private final Object defaultValue;
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala
index 751b1cb..adca738 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala
@@ -99,6 +99,7 @@
       "output_rows" -> metric("Native.output_rows"),
       "output_batches" -> metric("Native.output_batches"),
       "elapsed_compute" -> nanoTimingMetric("Native.elapsed_compute"),
+      "cudf_elapsed_compute" -> nanoTimingMetric("Native.cudf_elapsed_compute"),
       "build_hash_map_time" -> nanoTimingMetric("Native.build_hash_map_time"),
       "probed_side_hash_time" -> nanoTimingMetric("Native.probed_side_hash_time"),
       "probed_side_search_time" -> nanoTimingMetric("Native.probed_side_search_time"),
@@ -125,6 +126,9 @@
   private def getDefaultNativeFileMetrics(sc: SparkContext): Map[String, SQLMetric] = {
     TreeMap(
       "bytes_scanned" -> SQLMetrics.createSizeMetric(sc, "Native.bytes_scanned"),
+      "cudf_elapsed_compute" -> SQLMetrics.createNanoTimingMetric(
+        sc,
+        "Native.cudf_elapsed_compute"),
       "io_time" -> SQLMetrics.createNanoTimingMetric(sc, "Native.io_time"),
       "io_time_getfs" -> SQLMetrics.createNanoTimingMetric(sc, "Native.io_time_getfs"),
       // Parquet metrics
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterBase.scala
index 136e7b9..ea8b5d1 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterBase.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterBase.scala
@@ -52,6 +52,7 @@
           "stage_id",
           "output_rows",
           "elapsed_compute",
+          "cudf_elapsed_compute",
           "input_batch_count",
           "input_batch_mem_size",
           "input_row_count"))
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectBase.scala
index 852fff5..1ccdc30 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectBase.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectBase.scala
@@ -52,6 +52,7 @@
           "stage_id",
           "output_rows",
           "elapsed_compute",
+          "cudf_elapsed_compute",
           "input_batch_count",
           "input_batch_mem_size",
           "input_row_count"))