WIP: initialze GPU supporting with libcudf
diff --git a/Cargo.lock b/Cargo.lock index 2d4f340..26dffd7 100644 --- a/Cargo.lock +++ b/Cargo.lock
@@ -521,6 +521,19 @@ ] [[package]] +name = "blaze-cudf-bridge" +version = "0.1.0" +dependencies = [ + "arrow", + "datafusion", + "datafusion-ext-commons", + "log", + "num", + "parquet", + "paste", +] + +[[package]] name = "blaze-jni-bridge" version = "0.1.0" dependencies = [ @@ -1099,6 +1112,7 @@ "async-trait", "base64 0.22.1", "bitvec", + "blaze-cudf-bridge", "blaze-jni-bridge", "byteorder", "bytes 1.10.1",
diff --git a/Cargo.toml b/Cargo.toml index e280a56..4972028 100644 --- a/Cargo.toml +++ b/Cargo.toml
@@ -22,6 +22,7 @@ "native-engine/blaze", "native-engine/blaze-jni-bridge", "native-engine/blaze-serde", + "native-engine/blaze-cudf-bridge", ] [profile.release] @@ -45,6 +46,7 @@ [workspace.dependencies] blaze = { path = "./native-engine/blaze" } blaze-jni-bridge = { path = "./native-engine/blaze-jni-bridge" } +blaze-cudf-bridge = { path = "./native-engine/blaze-cudf-bridge" } blaze-serde = { path = "./native-engine/blaze-serde" } datafusion-ext-commons = { path = "./native-engine/datafusion-ext-commons" } datafusion-ext-exprs = { path = "./native-engine/datafusion-ext-exprs" }
diff --git a/native-engine/blaze-cudf-bridge/Cargo.lock b/native-engine/blaze-cudf-bridge/Cargo.lock new file mode 100644 index 0000000..8c47323 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/Cargo.lock
@@ -0,0 +1,1248 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "const-random", + "getrandom 0.2.16", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "arrow" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3095aaf545942ff5abd46654534f15b03a90fba78299d661e045e5d587222f0d" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-csv", + "arrow-data", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00752064ff47cee746e816ddb8450520c3a52cbad1e256f6fa861a35f86c45e7" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "num", +] + +[[package]] +name = "arrow-array" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cebfe926794fbc1f49ddd0cdaf898956ca9f6e79541efce62dabccfd81380472" +dependencies = [ + "ahash", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0303c7ec4cf1a2c60310fc4d6bbc3350cd051a17bf9e9c0a8e47b4db79277824" +dependencies = [ + "bytes", + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335f769c5a218ea823d3760a743feba1ef7857cba114c01399a891c2fff34285" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "atoi", + "base64", + "chrono", + "comfy-table", + "half", + "lexical-core", + "num", + "ryu", +] + +[[package]] +name = "arrow-csv" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "510db7dfbb4d5761826516cc611d97b3a68835d0ece95b034a052601109c0b1b" +dependencies = [ + "arrow-array", + "arrow-cast", + "arrow-schema", + "chrono", + "csv", + "csv-core", + "lazy_static", + "regex", +] + +[[package]] +name = "arrow-data" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8affacf3351a24039ea24adab06f316ded523b6f8c3dbe28fbac5f18743451b" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + +[[package]] +name = "arrow-ipc" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69880a9e6934d9cba2b8630dd08a3463a91db8693b16b499d54026b6137af284" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "flatbuffers", +] + +[[package]] +name = "arrow-json" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8dafd17a05449e31e0114d740530e0ada7379d7cb9c338fd65b09a8130960b0" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "indexmap", + "lexical-core", + "memchr", + "num", + "serde", + "serde_json", + "simdutf8", +] + +[[package]] +name = "arrow-ord" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "895644523af4e17502d42c3cb6b27cb820f0cb77954c22d75c23a85247c849e1" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", +] + +[[package]] +name = "arrow-row" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9be8a2a4e5e7d9c822b2b8095ecd77010576d824f654d347817640acfc97d229" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", +] + +[[package]] +name = "arrow-schema" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7450c76ab7c5a6805be3440dc2e2096010da58f7cab301fdc996a4ee3ee74e49" +dependencies = [ + "bitflags", +] + +[[package]] +name = "arrow-select" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa5f5a93c75f46ef48e4001535e7b6c922eeb0aa20b73cf58d09e13d057490d8" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e7005d858d84b56428ba2a98a107fe88c0132c61793cf6b8232a1f9bfc0452b" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num", + "regex", + "regex-syntax", +] + +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + +[[package]] +name = "bitflags" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" + +[[package]] +name = "blaze-cudf-bridge" +version = "0.1.0" +dependencies = [ + "arrow", + "log", + "num", + "parquet", + "paste", +] + +[[package]] +name = "brotli" +version = "7.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a334ef7c9e23abf0ce748e8cd309037da93e606ad52eb372e4ce327a0dcfbdfd" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + +[[package]] +name = "bumpalo" +version = "3.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + +[[package]] +name = "cc" +version = "1.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8691782945451c1c383942c4874dbe63814f61cb57ef773cda2972682b7bb3c0" +dependencies = [ + "jobserver", + "libc", + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "chrono" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "windows-link", +] + +[[package]] +name = "comfy-table" +version = "7.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a65ebfec4fb190b6f90e944a817d60499ee0744e582530e2c9900a22e591d9a" +dependencies = [ + "unicode-segmentation", + "unicode-width", +] + +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.16", + "once_cell", + "tiny-keccak", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "crunchy" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929" + +[[package]] +name = "csv" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d02f3b0da4c6504f86e9cd789d8dbafab48c2321be74e9987593de5a894d93d" +dependencies = [ + "memchr", +] + +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "flatbuffers" +version = "25.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1" +dependencies = [ + "bitflags", + "rustc_version", +] + +[[package]] +name = "flate2" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" +dependencies = [ + "crc32fast", + "libz-rs-sys", + "miniz_oxide", +] + +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + +[[package]] +name = "half" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9" +dependencies = [ + "cfg-if", + "crunchy", + "num-traits", +] + +[[package]] +name = "hashbrown" +version = "0.15.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" + +[[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "indexmap" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" +dependencies = [ + "equivalent", + "hashbrown", +] + +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + +[[package]] +name = "jobserver" +version = "0.1.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a" +dependencies = [ + "getrandom 0.3.3", + "libc", +] + +[[package]] +name = "js-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "lexical-core" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b765c31809609075565a70b4b71402281283aeda7ecaf4818ac14a7b2ade8958" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de6f9cb01fb0b08060209a057c048fcbab8717b4c1ecd2eac66ebfe39a65b0f2" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72207aae22fc0a121ba7b6d479e42cbfea549af1479c3f3a4f12c70dd66df12e" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a82e24bf537fd24c177ffbbdc6ebcc8d54732c35b50a3f28cc3f4e4c949a0b3" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5afc668a27f460fb45a81a757b6bf2f43c2d7e30cb5a2dcd3abf294c78d62bd" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "629ddff1a914a836fb245616a7888b62903aae58fa771e1d83943035efa0f978" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "libc" +version = "0.2.172" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" + +[[package]] +name = "libm" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" + +[[package]] +name = "libz-rs-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6489ca9bd760fe9642d7644e827b0c9add07df89857b0416ee15c1cc1a3b8c5a" +dependencies = [ + "zlib-rs", +] + +[[package]] +name = "log" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" + +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +dependencies = [ + "twox-hash 1.6.3", +] + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a" +dependencies = [ + "adler2", +] + +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", + "libm", +] + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + +[[package]] +name = "parquet" +version = "55.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd31a8290ac5b19f09ad77ee7a1e6a541f1be7674ad410547d5f1eef6eef4a9c" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64", + "brotli", + "bytes", + "chrono", + "flate2", + "half", + "hashbrown", + "lz4_flex", + "num", + "num-bigint", + "paste", + "seq-macro", + "simdutf8", + "snap", + "thrift", + "twox-hash 2.1.0", + "zstd", +] + +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + +[[package]] +name = "proc-macro2" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" + +[[package]] +name = "regex" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + +[[package]] +name = "rustversion" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2" + +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + +[[package]] +name = "semver" +version = "1.0.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" + +[[package]] +name = "seq-macro" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" + +[[package]] +name = "serde" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + +[[package]] +name = "syn" +version = "2.0.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float", +] + +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + +[[package]] +name = "twox-hash" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7b17f197b3050ba473acf9181f7b1d3b66d1cf7356c6cc57886662276e65908" + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + +[[package]] +name = "unicode-width" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" +dependencies = [ + "bumpalo", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "windows-core" +version = "0.61.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-link" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" + +[[package]] +name = "windows-result" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97" +dependencies = [ + "windows-link", +] + +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags", +] + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zlib-rs" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "868b928d7949e09af2f6086dfc1e01936064cc7a819253bce650d4e2a2d63ba8" + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.15+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" +dependencies = [ + "cc", + "pkg-config", +]
diff --git a/native-engine/blaze-cudf-bridge/Cargo.toml b/native-engine/blaze-cudf-bridge/Cargo.toml new file mode 100644 index 0000000..5f6d5bc --- /dev/null +++ b/native-engine/blaze-cudf-bridge/Cargo.toml
@@ -0,0 +1,14 @@ +[package] +name = "blaze-cudf-bridge" +version = "0.1.0" +edition = "2021" +resolver = "1" + +[dependencies] +arrow = { workspace = true } +datafusion = { workspace = true } +datafusion-ext-commons = { workspace = true } +parquet = { workspace = true } +log = "0.4.27" +num = "0.4.3" +paste = "1.0.15"
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/.clang-format b/native-engine/blaze-cudf-bridge/bridge-cpp/.clang-format new file mode 100644 index 0000000..5dc170d --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/.clang-format
@@ -0,0 +1,20 @@ +BasedOnStyle: Chromium + +## custom style +AccessModifierOffset: 0 +AlignAfterOpenBracket: AlwaysBreak +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortBlocksOnASingleLine: false +AllowShortFunctionsOnASingleLine: false +BinPackArguments: true +BinPackParameters: OnePerLine +BreakConstructorInitializers: AfterColon +BreakTemplateDeclarations: true +ColumnLimit: 100 +ContinuationIndentWidth: 4 +IncludeBlocks: Merge +IndentAccessModifiers: false +IndentCaseBlocks: false +IndentWidth: 4 +InsertBraces: true +InsertNewlineAtEOF: true
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/CMakeLists.txt b/native-engine/blaze-cudf-bridge/bridge-cpp/CMakeLists.txt new file mode 100644 index 0000000..f7cad96 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/CMakeLists.txt
@@ -0,0 +1,25 @@ +cmake_minimum_required(VERSION 3.15) +project(blaze-cudf-bridge) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(VCPKG_BUILD_TYPE release) +set(VCPKG_OVERLAY_PORTS "${CMAKE_SOURCE_DIR}/vcpkg_ports") + +find_package(Arrow CONFIG REQUIRED) +find_package(CUDAToolkit REQUIRED) +find_package(cudf REQUIRED) +find_package(spdlog CONFIG REQUIRED) + +include_directories("${CMAKE_SOURCE_DIR}") +file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS "src/*.cpp") +list(FILTER SOURCES EXCLUDE REGEX ".*_test\\.cpp$") +add_library(${PROJECT_NAME} SHARED ${SOURCES}) + +target_link_libraries(${PROJECT_NAME} + PRIVATE + Arrow::arrow_static + CUDA::cudart + cudf::cudf + spdlog::spdlog +)
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/CMakePresets.json b/native-engine/blaze-cudf-bridge/bridge-cpp/CMakePresets.json new file mode 100644 index 0000000..c394626 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/CMakePresets.json
@@ -0,0 +1,15 @@ +{ + "version": 2, + "configurePresets": [ + { + "name": "vcpkg", + "generator": "Ninja", + "binaryDir": "${sourceDir}/build", + "cacheVariables": { + "CMAKE_TOOLCHAIN_FILE": "$env{VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake", + "VCPKG_INSTALLED_DIR": "${sourceDir}/vcpkg_installed", + "VCPKG_INSTALL_OPTIONS": "--allow-unsupported" + } + } + ] +}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/build.sh b/native-engine/blaze-cudf-bridge/bridge-cpp/build.sh new file mode 100755 index 0000000..5caa859 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/build.sh
@@ -0,0 +1,19 @@ +#!/bin/bash + +set -x +set -e + +ROOT_DIR="$(cd `dirname "$0"` && pwd)" + +if [ ! -d "$VCPKG_ROOT" ]; then + echo "invalid VCPKG_ROOT: $VCPKG_ROOT" >&2 + exit -1 +fi + +cmake --preset=vcpkg \ + -DCMAKE_BUILD_TYPE=Release \ + -DVCPKG_BUILD_TYPE=release \ + -DVCPKG_OVERLAY_PORTS=./vcpkg_ports \ + -DCMAKE_EXPORT_COMPILE_COMMANDS=1 + +cmake --build build
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.cpp new file mode 100644 index 0000000..7491537 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.cpp
@@ -0,0 +1,80 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "expr.hpp" +#include <cudf/ast/expressions.hpp> +#include <cudf/scalar/scalar.hpp> +#include <memory> +#include <stdexcept> + +size_t ExprBuilder::add_column_reference(size_t column_idx) { + auto expr_id = tree.size(); + tree.emplace<cudf::ast::column_reference>(column_idx); + return expr_id; +} + +size_t ExprBuilder::add_column_name_reference(std::string const& name) { + auto expr_id = tree.size(); + tree.emplace<cudf::ast::column_name_reference>(name); + return expr_id; +} + +size_t ExprBuilder::add_unary_operation(size_t arg1_expr_id, int op) { + auto const& arg1 = tree.at(arg1_expr_id); + auto expr_id = tree.size(); + tree.emplace<cudf::ast::operation>((cudf::ast::ast_operator)op, arg1); + return expr_id; +} + +size_t ExprBuilder::add_binary_operation(size_t arg1_expr_id, size_t arg2_expr_id, int op) { + auto const& arg1 = tree.at(arg1_expr_id); + auto const& arg2 = tree.at(arg2_expr_id); + auto expr_id = tree.size(); + tree.emplace<cudf::ast::operation>((cudf::ast::ast_operator)op, arg1, arg2); + return expr_id; +} + +size_t ExprBuilder::add_literal(std::shared_ptr<cudf::scalar> scalar) { + auto& scalar_value = *scalar.get(); + auto&& type_id = typeid(scalar_value); + auto expr_id = tree.size(); + +#define handle_numeric_type(c_ty) \ + if (type_id == typeid(cudf::numeric_scalar<c_ty>)) { \ + tree.emplace<cudf::ast::literal>(dynamic_cast<cudf::numeric_scalar<c_ty>&>(scalar_value)); \ + scalar_pool.push_back((scalar)); \ + return expr_id; \ + } + handle_numeric_type(int8_t); + handle_numeric_type(int16_t); + handle_numeric_type(int32_t); + handle_numeric_type(int64_t); + handle_numeric_type(uint8_t); + handle_numeric_type(uint16_t); + handle_numeric_type(uint32_t); + handle_numeric_type(uint64_t); + handle_numeric_type(float); + handle_numeric_type(double); + + if (type_id == typeid(cudf::string_scalar)) { + tree.emplace<cudf::ast::literal>(dynamic_cast<cudf::string_scalar&>(scalar_value)); + scalar_pool.push_back(scalar); + return expr_id; + } + throw std::runtime_error(std::string("unsupported scalar type: ") + type_id.name()); +} + +cudf::ast::expression const& ExprBuilder::expr() const { + return tree.back(); +}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.hpp new file mode 100644 index 0000000..b852847 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/expr.hpp
@@ -0,0 +1,34 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include <cudf/ast/expressions.hpp> +#include <cudf/scalar/scalar.hpp> +#include <memory> + +class ExprBuilder { + public: + size_t add_unary_operation(size_t arg1_expr_id, int op); + size_t add_binary_operation(size_t arg1_expr_id, size_t arg2_expr_id, int op); + size_t add_column_reference(size_t column_idx); + size_t add_column_name_reference(std::string const& name); + size_t add_literal(std::shared_ptr<cudf::scalar> scalar); + + cudf::ast::expression const& expr() const; + + private: + cudf::ast::tree tree; + std::vector<std::shared_ptr<cudf::scalar>> scalar_pool; +};
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/ffi.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/ffi.cpp new file mode 100644 index 0000000..7ceead7 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/ffi.cpp
@@ -0,0 +1,360 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <arrow/c/abi.h> +#include <arrow/c/bridge.h> +#include <arrow/result.h> +#include <arrow/type.h> +#include <spdlog/spdlog.h> +#include <cstdint> +#include <cudf/interop.hpp> +#include <cudf/io/datasource.hpp> +#include <cudf/scalar/scalar.hpp> +#include <cudf/table/table.hpp> +#include <cudf/table/table_view.hpp> +#include <cudf/utilities/error.hpp> +#include <memory> +#include <stdexcept> +#include <vector> +#include "src/expr.hpp" +#include "src/plan/filter.hpp" +#include "src/plan/parquet_scan.hpp" +#include "src/plan/plan.hpp" +#include "src/plan/project.hpp" +#include "src/plan/split_table.hpp" +#include "src/plan/union.hpp" + +template <typename T> +struct alignas(8) FFIObjWrapper { + FFIObjWrapper(T inner) : inner(std::move(inner)) { + } + T inner; +}; + +template <typename T> +struct alignas(8) FFIResult { + FFIResult(bool ok, T value) : ok(ok), value(std::move(value)) { + } + bool ok; + T value; +}; + +template <> +struct alignas(8) FFIResult<void> { + FFIResult(bool ok) : ok(ok) { + } + bool ok; +}; + +#define define_ffi_obj_wrapped_type(name, ty) \ + using name = FFIObjWrapper<ty>*; \ + extern "C" void ffi_##name##_delete(name ptr) { \ + delete ptr; \ + } + +define_ffi_obj_wrapped_type(FFI_PlanPtr, std::shared_ptr<Plan>); +define_ffi_obj_wrapped_type(FFI_TableStreamPtr, std::unique_ptr<TableStream>); +define_ffi_obj_wrapped_type(FFI_TablePtr, std::unique_ptr<OwnedTable>); +define_ffi_obj_wrapped_type(FFI_ScalarPtr, std::shared_ptr<cudf::scalar>); +define_ffi_obj_wrapped_type(FFI_ExprBuilderPtr, std::shared_ptr<ExprBuilder>); + +#define catch_exceptions_and_return(ret) \ + catch (cudf::logic_error err) { \ + spdlog::error("cudf logic error({}): {}", __func__, err.what()); \ + return (ret); \ + } \ + catch (cudf::data_type_error err) { \ + spdlog::error("cudf data type error({}): {}", __func__, err.what()); \ + std::cerr << err.stacktrace(); \ + return (ret); \ + } \ + catch (cudf::cuda_error err) { \ + spdlog::error("cudf cuda error({}): {}", __func__, err.what()); \ + std::cerr << err.stacktrace(); \ + return (ret); \ + } \ + catch (std::runtime_error err) { \ + spdlog::error("runtime error({}): {}", __func__, err.what()); \ + return (ret); \ + } \ + catch (std::exception err) { \ + spdlog::error("exception({}): {}", __func__, err.what()); \ + return (ret); \ + } + +struct alignas(8) FFI_CudfDataSource { + void (*dtor)(FFI_CudfDataSource* self); + size_t (*data_size)(FFI_CudfDataSource const* self); + FFIResult<size_t> (*read)( + FFI_CudfDataSource* self, size_t offset, uint8_t* buf, size_t buf_size); + void* raw; + + std::unique_ptr<cudf::io::datasource> to_cudf_io_datasource() { + struct Wrapper : public cudf::io::datasource { + public: + FFI_CudfDataSource inner; + + Wrapper(FFI_CudfDataSource wrapped) : inner(wrapped) { + } + + ~Wrapper() { + inner.dtor(&inner); + } + + std::unique_ptr<datasource::buffer> host_read(size_t offset, size_t size) { + auto buffer = std::vector<uint8_t>(size); + auto total_read_size = 0; + + while (total_read_size < size) { + auto read_size = host_read(offset, size, &buffer[total_read_size]); + if (read_size == 0) { + throw std::runtime_error("got unexpected EOF while reading datasource"); + } + total_read_size += read_size; + } + return owning_buffer<std::vector<uint8_t>>::create(std::move(buffer)); + } + + size_t host_read(size_t offset, size_t size, uint8_t* dst) { + auto read_size_ret = inner.read(&inner, offset, dst, size); + if (!read_size_ret.ok) { + throw std::runtime_error("error reading datasource"); + } + return read_size_ret.value; + } + + size_t size() const { + return inner.data_size(&inner); + } + }; + return std::make_unique<Wrapper>(*this); + } +}; + +extern "C" FFI_PlanPtr ffi_new_split_table_plan( + FFI_PlanPtr input_plan, + size_t batch_size) { + try { + auto plan = std::make_shared<SplitTablePlan>(input_plan->inner, batch_size); + return new FFIObjWrapper((std::shared_ptr<Plan>&&)std::move(plan)); + } + catch_exceptions_and_return(nullptr); +} + +extern "C" FFI_PlanPtr ffi_new_union_plan( + FFI_PlanPtr* input_plans_ptr, + size_t input_plans_len, + ArrowSchema* ffi_schema) { + try { + auto input_plans = std::vector<std::shared_ptr<Plan>>(); + for (auto i = 0; i < input_plans_len; i++) { + input_plans.push_back(input_plans_ptr[i]->inner); + } + + auto schema_ret = arrow::ImportSchema(ffi_schema); + if (!schema_ret.ok()) { + return nullptr; + } + auto schema = schema_ret.ValueUnsafe(); + + auto plan = std::make_shared<UnionPlan>(std::move(input_plans), schema); + return new FFIObjWrapper((std::shared_ptr<Plan>&&)std::move(plan)); + } + catch_exceptions_and_return(nullptr); +} + +extern "C" FFI_PlanPtr ffi_new_parquet_scan_plan( + FFI_CudfDataSource* ffi_datasource, + FFI_ExprBuilderPtr expr, + size_t read_offset, + size_t read_size, + ArrowSchema* ffi_read_schema) { + try { + auto datasource = ffi_datasource->to_cudf_io_datasource(); + auto read_schema_ret = arrow::ImportSchema(ffi_read_schema); + if (!read_schema_ret.ok()) { + return nullptr; + } + auto read_schema = read_schema_ret.ValueUnsafe(); + + auto plan = std::make_shared<ParquetScanPlan>( + std::move(datasource), expr->inner, read_offset, read_size, read_schema); + return new FFIObjWrapper((std::shared_ptr<Plan>&&)std::move(plan)); + } + catch_exceptions_and_return(nullptr); +} + +extern "C" FFI_PlanPtr ffi_new_project_plan( + FFI_PlanPtr input_plan, + FFI_ExprBuilderPtr* exprs_ptr, + size_t exprs_len, + ArrowSchema* ffi_schema) { + try { + auto schema_ret = arrow::ImportSchema(ffi_schema); + if (!schema_ret.ok()) { + return nullptr; + } + auto schema = schema_ret.MoveValueUnsafe(); + + auto exprs = std::vector<std::shared_ptr<ExprBuilder>>(); + for (size_t i = 0; i < exprs_len; i++) { + exprs.push_back(exprs_ptr[i]->inner); + } + auto plan = std::make_shared<ProjectPlan>(input_plan->inner, exprs, schema); + return new FFIObjWrapper((std::shared_ptr<Plan>&&)std::move(plan)); + } + catch_exceptions_and_return(nullptr); +} + +extern "C" FFI_PlanPtr ffi_new_filter_plan(FFI_PlanPtr input_plan, FFI_ExprBuilderPtr expr) { + try { + auto plan = std::make_shared<FilterPlan>(input_plan->inner, expr->inner); + return new FFIObjWrapper((std::shared_ptr<Plan>&&)std::move(plan)); + } + catch_exceptions_and_return(nullptr); +} + +extern "C" FFIResult<FFI_TableStreamPtr> ffi_execute_plan(FFI_PlanPtr plan) { + try { + spdlog::info("cudf executing plan:\n{}", desc_plan_tree(plan->inner)); + auto table_stream_ret = plan->inner->execute(); + if (!table_stream_ret.ok()) { + return FFIResult(false, (FFI_TableStreamPtr) nullptr); + } + return FFIResult(true, new FFIObjWrapper(std::move(table_stream_ret.ValueUnsafe()))); + } + catch_exceptions_and_return(FFIResult(false, (FFI_TableStreamPtr) nullptr)); +} + +extern "C" FFIResult<bool> ffi_table_stream_has_next(FFI_TableStreamPtr table_stream) { + try { + auto has_next_ret = table_stream->inner->has_next(); + if (!has_next_ret.ok()) { + return FFIResult(false, false); + } + return FFIResult(true, has_next_ret.ValueUnsafe()); + } + catch_exceptions_and_return(FFIResult(false, false)); +} + +extern "C" FFIResult<FFI_TablePtr> ffi_table_stream_next(FFI_TableStreamPtr table_stream) { + try { + auto next_ret = table_stream->inner->next(); + if (!next_ret.ok()) { + return FFIResult(false, (FFI_TablePtr) nullptr); + } + return FFIResult( + true, new FFIObjWrapper(std::make_unique<OwnedTable>(next_ret.MoveValueUnsafe()))); + } + catch_exceptions_and_return(FFIResult(false, (FFI_TablePtr) nullptr)); +} + +extern "C" FFIResult<void> ffi_table_to_arrow( + FFI_TablePtr table, ArrowSchema* ffi_schema, ArrowArray* ffi_array) { + try { + auto schema_ret = arrow::ImportType(ffi_schema); + if (!schema_ret.ok()) { + return FFIResult<void>(false); + } + auto schema = schema_ret.ValueUnsafe(); + + auto device_table = cudf::to_arrow_host(table->inner->view()); + auto arrow_table_ret = arrow::ImportDeviceArray(device_table.get(), schema); + if (!arrow_table_ret.ok()) { + return FFIResult<void>(false); + } + auto arrow_table = arrow_table_ret.ValueUnsafe(); + + if (!arrow::ExportArray(*arrow_table, ffi_array).ok()) { + return FFIResult<void>(false); + } + return FFIResult<void>(true); + } + catch_exceptions_and_return(FFIResult<void>(false)); +} + +#define ffi_def_numeric_scalar(rust_ty, c_ty) \ + extern "C" FFI_ScalarPtr ffi_##rust_ty##_scalar(c_ty value) { \ + try { \ + auto scalar = std::make_shared<cudf::numeric_scalar<c_ty>>(value); \ + return new FFIObjWrapper((std::shared_ptr<cudf::scalar>&&)std::move(scalar)); \ + } \ + catch_exceptions_and_return(nullptr); \ + } + +ffi_def_numeric_scalar(i8, int8_t); +ffi_def_numeric_scalar(i16, int16_t); +ffi_def_numeric_scalar(i32, int32_t); +ffi_def_numeric_scalar(i64, int64_t); +ffi_def_numeric_scalar(u8, uint8_t); +ffi_def_numeric_scalar(u16, uint16_t); +ffi_def_numeric_scalar(u32, uint32_t); +ffi_def_numeric_scalar(u64, uint64_t); +ffi_def_numeric_scalar(f32, float); +ffi_def_numeric_scalar(f64, double); + +extern "C" FFI_ScalarPtr ffi_string_scalar(char const* value) { + try { + auto scalar = std::make_shared<cudf::string_scalar>(std::string(value)); + return new FFIObjWrapper((std::shared_ptr<cudf::scalar>&&)std::move(scalar)); + } + catch_exceptions_and_return(nullptr); +} + +extern "C" FFI_ExprBuilderPtr ffi_new_expr_builder() { + try { + return new FFIObjWrapper(std::make_shared<ExprBuilder>()); + } + catch_exceptions_and_return(nullptr); +} + +extern "C" size_t ffi_expr_builder_add_literal( + FFI_ExprBuilderPtr expr_builder, FFI_ScalarPtr scalar_value) { + try { + return expr_builder->inner->add_literal(scalar_value->inner); + } + catch_exceptions_and_return(-1); +} + +extern "C" size_t ffi_expr_builder_add_column_reference( + FFI_ExprBuilderPtr expr_builder, size_t column_idx) { + try { + return expr_builder->inner->add_column_reference(column_idx); + } + catch_exceptions_and_return(-1); +} + +extern "C" size_t ffi_expr_builder_add_column_name_reference( + FFI_ExprBuilderPtr expr_builder, char const* name) { + try { + return expr_builder->inner->add_column_name_reference(name); + } + catch_exceptions_and_return(-1); +} + +extern "C" size_t ffi_expr_builder_add_unary_expr( + FFI_ExprBuilderPtr expr_builder, size_t arg1_expr_id, int32_t op) { + try { + return expr_builder->inner->add_unary_operation(arg1_expr_id, op); + } + catch_exceptions_and_return(-1); +} + +extern "C" size_t ffi_expr_builder_add_binary_expr( + FFI_ExprBuilderPtr expr_builder, size_t arg1_expr_id, size_t arg2_expr_id, int32_t op) { + try { + return expr_builder->inner->add_binary_operation(arg1_expr_id, arg2_expr_id, op); + } + catch_exceptions_and_return(-1); +}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.cpp new file mode 100644 index 0000000..74c44b4 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.cpp
@@ -0,0 +1,61 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "filter.hpp" +#include <arrow/result.h> +#include <cudf/stream_compaction.hpp> +#include <cudf/transform.hpp> +#include <memory> +#include <ostream> +#include "src/plan/plan.hpp" +#include "src/util.hpp" + +class FilterStream : public TableStream { + public: + FilterStream(std::unique_ptr<TableStream> input, std::shared_ptr<ExprBuilder> expr) : + input(std::move(input)), expr(expr) { + } + + arrow::Result<bool> has_next() { + return input->has_next(); + } + + arrow::Result<OwnedTable> next() { + auto next_input = ARROW_TRY_RESULT(input->next()); + auto filtered_mask = cudf::compute_column(next_input.view(), expr->expr()); + auto filtered_table = cudf::apply_boolean_mask(next_input.view(), *filtered_mask); + return arrow::Result(std::move(filtered_table)); + } + + private: + std::unique_ptr<TableStream> input; + std::shared_ptr<ExprBuilder> expr; +}; + +FilterPlan::FilterPlan(std::shared_ptr<Plan> input, std::shared_ptr<ExprBuilder> expr) : + input(input), expr(expr) { +} + +void FilterPlan::desc(std::ostream& oss) const { + oss << "CudfFilter []"; +} + +std::shared_ptr<arrow::Schema> FilterPlan::schema() const { + return input->schema(); +} + +arrow::Result<std::unique_ptr<TableStream>> FilterPlan::execute() const { + auto input_stream = ARROW_TRY_RESULT(input->execute()); + return arrow::Result(std::make_unique<FilterStream>(std::move(input_stream), expr)); +}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.hpp new file mode 100644 index 0000000..d4bfb80 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/filter.hpp
@@ -0,0 +1,41 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include <arrow/type.h> +#include <cudf/ast/expressions.hpp> +#include <memory> +#include <ostream> +#include <vector> +#include "src/expr.hpp" +#include "src/plan/plan.hpp" + +class FilterPlan : public Plan { + public: + FilterPlan(std::shared_ptr<Plan> input, std::shared_ptr<ExprBuilder> expr); + + public: + void desc(std::ostream& oss) const; + std::shared_ptr<arrow::Schema> schema() const; + arrow::Result<std::unique_ptr<TableStream>> execute() const; + + inline std::vector<std::shared_ptr<Plan>> children() const { + return std::vector{input}; + } + + private: + std::shared_ptr<Plan> input; + std::shared_ptr<ExprBuilder> expr; +};
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.cpp new file mode 100644 index 0000000..81a703f --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.cpp
@@ -0,0 +1,159 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "parquet_scan.hpp" +#include <arrow/result.h> +#include <arrow/type.h> +#include <arrow/type_fwd.h> +#include <spdlog/fmt/ranges.h> +#include <spdlog/spdlog.h> +#include <algorithm> +#include <cstring> +#include <cudf/ast/expressions.hpp> +#include <cudf/column/column.hpp> +#include <cudf/io/datasource.hpp> +#include <cudf/io/parquet.hpp> +#include <cudf/io/parquet_metadata.hpp> +#include <cudf/io/types.hpp> +#include <cudf/types.hpp> +#include <memory> +#include <optional> +#include <regex> +#include <stdexcept> +#include <utility> +#include <vector> +#include "src/expr.hpp" +#include "src/plan/plan.hpp" + +class ParquetScanTableStream : public TableStream { + public: + ParquetScanTableStream(std::shared_ptr<cudf::io::chunked_parquet_reader> reader) : + reader(reader) { + // according to cudf docs, first chunk must be existed + auto first_table = reader->read_chunk().tbl; + if (first_table->num_rows() > 0) { + next_table = std::make_optional(std::move(first_table)); + } + } + + arrow::Result<bool> has_next() { + if (!next_table.has_value() && reader->has_next()) { + next_table = std::make_optional(std::move(reader->read_chunk().tbl)); + } + return arrow::Result(next_table.has_value()); + } + + arrow::Result<OwnedTable> next() { + auto p = std::move(next_table).value(); + next_table = std::nullopt; + return arrow::Result(std::move(p)); + } + + private: + std::shared_ptr<cudf::io::chunked_parquet_reader> reader; + std::optional<std::unique_ptr<cudf::table>> next_table; +}; + +ParquetScanPlan::ParquetScanPlan( + std::unique_ptr<cudf::io::datasource> datasource, + std::shared_ptr<ExprBuilder> predicate_filter_expr, + size_t read_offset, + size_t read_size, + std::shared_ptr<arrow::Schema> schema) : + datasource(std::move(datasource)), + predicate_filter_expr(predicate_filter_expr), + read_offset(read_offset), + read_size(read_size), + output_schema(schema) { +} + +void ParquetScanPlan::desc(std::ostream& oss) const { + oss << "CudfParquetScan ["; + oss << std::regex_replace(output_schema->ToString(), std::regex("\n"), ", "); + oss << "]"; +} + +std::shared_ptr<arrow::Schema> ParquetScanPlan::schema() const { + return output_schema; +} + +inline static std::vector<cudf::io::reader_column_schema> build_reader_column_schema( + arrow::Schema const& output_schema, cudf::io::parquet_metadata const& metadata) { + auto column_schema = std::vector<cudf::io::reader_column_schema>(); + auto& fields = output_schema.fields(); + + for (auto const& col : metadata.schema().root().children()) { + auto field = std::find_if(fields.begin(), fields.end(), [&](auto f) { + return strcasecmp(f->name().c_str(), col.name().c_str()) == 0; + }); + if (field == fields.end()) { + column_schema.emplace_back(); + } else if (field->get()->type()->id() == arrow::Type::STRING) { + column_schema.emplace_back().set_convert_binary_to_strings(true); + } else if (field->get()->type()->id() == arrow::Type::STRUCT) { + throw std::runtime_error("complex type not implemented"); + } else { + column_schema.emplace_back(); + } + } + return column_schema; +} + +arrow::Result<std::unique_ptr<TableStream>> ParquetScanPlan::execute() const { + auto datasource_total_size = datasource->size(); + auto source_info = cudf::io::source_info(&*datasource); + auto metadata = cudf::io::read_parquet_metadata(source_info); + + // calculate row group ids to read + auto read_row_group_ids = std::vector<cudf::size_type>(); + auto num_row_groups = metadata.num_rowgroups(); + auto file_total_byte_size = (uint64_t)0; + for (auto i = 0; i < num_row_groups; i++) { + auto rg_total_byte_size = metadata.rowgroup_metadata()[i].at("total_byte_size"); + file_total_byte_size += rg_total_byte_size; + } + auto start = (uint64_t)0; + for (auto i = 0; i < num_row_groups; i++) { + auto rg_total_byte_size = metadata.rowgroup_metadata()[i].at("total_byte_size"); + start += rg_total_byte_size; + auto mapped_end = start * datasource_total_size / file_total_byte_size; + if (mapped_end > read_offset && mapped_end <= read_offset + read_size) { + read_row_group_ids.push_back(i); + } + } + if (read_row_group_ids.empty()) { // no row groups to read + return TableStream::empty_stream(); + } + spdlog::info("reading parquet row groups: {}", read_row_group_ids); + spdlog::info("reading parquet fields: {}", output_schema->field_names()); + + auto reader_column_schema = build_reader_column_schema(*this->output_schema, metadata); + auto options_builder = + cudf::io::parquet_reader_options_builder(source_info) + .row_groups(std::vector<std::vector<cudf::size_type>>{std::move(read_row_group_ids)}) + .set_column_schema(reader_column_schema) + .columns(output_schema->field_names()) + .allow_mismatched_pq_schemas(true); + + // set optional predicate filter + auto const& pred = predicate_filter_expr->expr(); + if (typeid(pred) == typeid(cudf::ast::operation)) { + options_builder.filter(pred); + } + auto options = options_builder.build(); + + auto chunk_bytes_size = 16777216; + auto reader = std::make_shared<cudf::io::chunked_parquet_reader>(chunk_bytes_size, options); + return std::make_unique<ParquetScanTableStream>(reader); +}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.hpp new file mode 100644 index 0000000..b14b378 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/parquet_scan.hpp
@@ -0,0 +1,47 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include <arrow/type.h> +#include <cudf/ast/expressions.hpp> +#include <cudf/io/datasource.hpp> +#include <functional> +#include <memory> +#include <optional> +#include <ostream> +#include "src/expr.hpp" +#include "src/plan/plan.hpp" + +class ParquetScanPlan : public Plan { + public: + ParquetScanPlan( + std::unique_ptr<cudf::io::datasource> datasource, + std::shared_ptr<ExprBuilder> predicate_filter_expr, + size_t read_offset, + size_t read_size, + std::shared_ptr<arrow::Schema> schema); + + void desc(std::ostream& oss) const; + std::shared_ptr<arrow::Schema> schema() const; + arrow::Result<std::unique_ptr<TableStream>> execute() const; + + private: + std::unique_ptr<cudf::io::datasource> datasource; + std::shared_ptr<ExprBuilder> predicate_filter_expr; + size_t read_offset; + size_t read_size; + std::shared_ptr<arrow::Schema> output_schema; + std::optional<std::reference_wrapper<cudf::ast::expression const>> bound_predicate_filter; +};
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/plan.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/plan.hpp new file mode 100644 index 0000000..852bd9d --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/plan.hpp
@@ -0,0 +1,126 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include <arrow/result.h> +#include <arrow/type.h> +#include <cudf/table/table.hpp> +#include <cudf/table/table_view.hpp> +#include <memory> +#include <optional> +#include <ostream> +#include <sstream> +#include <stack> +#include <stdexcept> +#include <vector> + +class OwnedTable { + public: + inline OwnedTable(std::unique_ptr<cudf::table> owned) : + owned(std::make_optional(std::move(owned))) { + } + inline OwnedTable(std::shared_ptr<OwnedTable> holder, cudf::table_view referenced) : + referenced(std::make_pair(holder, referenced)) { + } + inline OwnedTable(OwnedTable&& moving) : + owned(std::move(moving.owned)), referenced(std::move(moving.referenced)) { + } + + inline cudf::table_view view() const { + if (owned.has_value()) { + return owned.value()->view(); + } + return referenced.value().second; + } + + private: + std::optional<std::unique_ptr<cudf::table>> owned; + std::optional<std::pair<std::shared_ptr<OwnedTable>, cudf::table_view>> referenced; +}; + +class TableStream { + public: + TableStream() { + } + + virtual ~TableStream() { + } + + public: + virtual arrow::Result<bool> has_next() = 0; + virtual arrow::Result<OwnedTable> next() = 0; + + inline static std::unique_ptr<TableStream> empty_stream() { + class EmptyTableStream : public TableStream { + public: + inline arrow::Result<bool> has_next() { + return arrow::Result(false); + } + + inline arrow::Result<OwnedTable> next() { + throw std::runtime_error("unreachable: EmptyTableStream has no tables"); + } + }; + return std::make_unique<EmptyTableStream>(); + } +}; + +class Plan { + public: + Plan() { + } + virtual ~Plan() { + } + + public: + virtual void desc(std::ostream& oss) const = 0; + virtual std::shared_ptr<arrow::Schema> schema() const = 0; + virtual arrow::Result<std::unique_ptr<TableStream>> execute() const = 0; + + virtual std::vector<std::shared_ptr<Plan>> children() const { + return std::vector<std::shared_ptr<Plan>>{}; + } +}; + +inline static std::string desc_plan_tree(std::shared_ptr<Plan> plan) { + struct plan_tree_node { + std::shared_ptr<Plan> plan; + int level; + plan_tree_node(std::shared_ptr<Plan> plan, int level) : plan(plan), level(level) { + } + }; + auto dfs_stack = std::stack<plan_tree_node>(); + auto nodes = std::vector<plan_tree_node>{}; + + dfs_stack.push(plan_tree_node(plan, 0)); + while (!dfs_stack.empty()) { + auto node = dfs_stack.top(); + nodes.push_back(node); + dfs_stack.pop(); + for (auto child : node.plan->children()) { + dfs_stack.push(plan_tree_node(child, node.level + 1)); + } + } + + std::ostringstream oss; + for (auto node : nodes) { + for (auto indent = 0; indent < 2 * node.level; indent++) { + oss << " "; + } + node.plan->desc(oss); + oss << std::endl; + } + return oss.str(); +}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.cpp new file mode 100644 index 0000000..a6c3b97 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.cpp
@@ -0,0 +1,82 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "project.hpp" +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/type.h> +#include <cudf/column/column.hpp> +#include <cudf/table/table.hpp> +#include <cudf/transform.hpp> +#include <memory> +#include <ostream> +#include <regex> +#include <vector> +#include "src/expr.hpp" +#include "src/plan/plan.hpp" +#include "src/util.hpp" + +class ProjectStream : public TableStream { + public: + ProjectStream( + std::unique_ptr<TableStream> input, + const std::vector<std::shared_ptr<ExprBuilder>>& exprs) : + input(std::move(input)), exprs(exprs) { + } + + public: + arrow::Result<bool> has_next() { + return input->has_next(); + } + + arrow::Result<OwnedTable> next() { + auto next_input = ARROW_TRY_RESULT(input->next()); + auto computed_columns = std::vector<std::unique_ptr<cudf::column>>(); + computed_columns.reserve(exprs.size()); + + for (const auto& expr : exprs) { + auto column = cudf::compute_column(next_input.view(), expr->expr()); + computed_columns.push_back(std::move(column)); + } + auto computed_table = + std::unique_ptr<cudf::table>(new cudf::table(std::move(computed_columns))); + return arrow::Result(std::move(computed_table)); + } + + private: + std::unique_ptr<TableStream> input; + std::vector<std::shared_ptr<ExprBuilder>> exprs; +}; + +ProjectPlan::ProjectPlan( + std::shared_ptr<Plan> input, + std::vector<std::shared_ptr<ExprBuilder>> exprs, + std::shared_ptr<arrow::Schema> schema) : + input(input), exprs(exprs), output_schema(schema) { +} + +void ProjectPlan::desc(std::ostream& oss) const { + oss << "CudfProject ["; + oss << std::regex_replace(output_schema->ToString(), std::regex("\n"), ", "); + oss << "]"; +} + +std::shared_ptr<arrow::Schema> ProjectPlan::schema() const { + return output_schema; +} + +arrow::Result<std::unique_ptr<TableStream>> ProjectPlan::execute() const { + auto input_stream = ARROW_TRY_RESULT(input->execute()); + return arrow::Result(std::make_unique<ProjectStream>(std::move(input_stream), exprs)); +}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.hpp new file mode 100644 index 0000000..5a2c0e0 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/project.hpp
@@ -0,0 +1,44 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include <arrow/type.h> +#include <cudf/ast/expressions.hpp> +#include <memory> +#include <vector> +#include "src/expr.hpp" +#include "src/plan/plan.hpp" + +class ProjectPlan : public Plan { + public: + ProjectPlan( + std::shared_ptr<Plan> input, + std::vector<std::shared_ptr<ExprBuilder>> exprs, + std::shared_ptr<arrow::Schema> schema); + + public: + std::shared_ptr<arrow::Schema> schema() const; + arrow::Result<std::unique_ptr<TableStream>> execute() const; + void desc(std::ostream& oss) const; + + inline std::vector<std::shared_ptr<Plan>> children() const { + return std::vector{input}; + } + + private: + std::shared_ptr<Plan> input; + std::vector<std::shared_ptr<ExprBuilder>> exprs; + std::shared_ptr<arrow::Schema> output_schema; +};
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.cpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.cpp new file mode 100644 index 0000000..e6dffb4 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.cpp
@@ -0,0 +1,96 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "split_table.hpp" +#include <algorithm> +#include <cudf/copying.hpp> +#include <cudf/table/table.hpp> +#include <cudf/table/table_view.hpp> +#include <cudf/types.hpp> +#include <deque> +#include <memory> +#include "src/plan/plan.hpp" +#include "src/plan/split_table.hpp" +#include "src/util.hpp" + +class SplitTableStream : public TableStream { + public: + SplitTableStream(std::unique_ptr<TableStream> input_stream, size_t batch_size) : + input_stream(std::move(input_stream)), batch_size(batch_size) { + } + + arrow::Result<bool> has_next() { + if (buffered_tables.empty()) { + auto input_has_next = ARROW_TRY_RESULT(input_stream->has_next()); + if (input_has_next) { + auto input_next = ARROW_TRY_RESULT(input_stream->next()); + auto input_num_rows = size_t(input_next.view().num_rows()); + + if (input_num_rows == 0) { + return has_next(); // process next input table + } + auto target_num_splitted_tables = (input_num_rows + batch_size - 1) / batch_size; + auto target_num_rows = (input_num_rows / target_num_splitted_tables) + 1; + + if (target_num_splitted_tables == 1) { + buffered_tables.push_back(std::move(input_next)); + } else { + auto split_indices = std::vector<cudf::size_type>(); + auto start = size_t(0); + while (start < input_num_rows) { + auto split_start = start; + auto split_end = std::min(start + target_num_rows, input_num_rows); + split_indices.push_back(cudf::size_type(split_start)); + split_indices.push_back(cudf::size_type(split_end)); + start = split_end; + } + auto holder = std::make_shared<OwnedTable>(std::move(input_next)); + auto sliced = cudf::slice(holder->view(), split_indices); + for (auto splitted: sliced) { + buffered_tables.emplace_back(holder, splitted); + } + } + } + } + return !buffered_tables.empty(); + } + + arrow::Result<OwnedTable> next() { + auto front = std::move(buffered_tables.front()); + buffered_tables.pop_front(); + return front; + } + + private: + std::unique_ptr<TableStream> input_stream; + size_t batch_size; + std::deque<OwnedTable> buffered_tables; +}; + +SplitTablePlan::SplitTablePlan(std::shared_ptr<Plan> input, size_t batch_size) : + input(input), batch_size(batch_size) { +} + +std::shared_ptr<arrow::Schema> SplitTablePlan::schema() const { + return input->schema(); +} + +arrow::Result<std::unique_ptr<TableStream>> SplitTablePlan::execute() const { + auto input_stream = ARROW_TRY_RESULT(input->execute()); + return arrow::Result(std::make_unique<SplitTableStream>(std::move(input_stream), batch_size)); +} + +void SplitTablePlan::desc(std::ostream& oss) const { + oss << "SplitTable []"; +}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.hpp new file mode 100644 index 0000000..556dcab --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/plan/split_table.hpp
@@ -0,0 +1,40 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include <arrow/type.h> +#include <cudf/ast/expressions.hpp> +#include <memory> +#include <vector> +#include "src/plan/plan.hpp" + +class SplitTablePlan : public Plan { + public: + SplitTablePlan(std::shared_ptr<Plan> input, size_t batch_size); + + public: + std::shared_ptr<arrow::Schema> schema() const; + arrow::Result<std::unique_ptr<TableStream>> execute() const; + void desc(std::ostream& oss) const; + + inline std::vector<std::shared_ptr<Plan>> children() const { + return std::vector{input}; + } + + private: + std::shared_ptr<Plan> input; + size_t batch_size; +}; +
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/src/util.hpp b/native-engine/blaze-cudf-bridge/bridge-cpp/src/util.hpp new file mode 100644 index 0000000..1d4d374 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/src/util.hpp
@@ -0,0 +1,25 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include <arrow/result.h> +#include <arrow/status.h> + +#define ARROW_TRY_RESULT(result) \ + ({ \ + auto __result = std::move(result); \ + ARROW_RETURN_NOT_OK(__result); \ + __result.MoveValueUnsafe(); \ + })
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg.json b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg.json new file mode 100644 index 0000000..a50ed56 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg.json
@@ -0,0 +1,10 @@ +{ + "name": "blaze-cudf-bridge", + "version": "0.1.0", + "dependencies": [ + "arrow", + "cuda", + "libcudf", + "spdlog" + ] +}
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/portfile.cmake b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/portfile.cmake new file mode 100644 index 0000000..cefdccb --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/portfile.cmake
@@ -0,0 +1,49 @@ +vcpkg_from_github( + OUT_SOURCE_PATH SOURCE_PATH + REPO rapidsai/cudf + REF "branch-25.06" + SHA512 49fb54e301e070308d2d3be933c61b249b8e67073dbe36db597950aaff84f61d14038ece830e0d61e322d415547b45cb11b37155fd6606f7c5edf1667363c2e4 + HEAD_REF main +) + +# check for nvcc +execute_process( + COMMAND nvcc --version + OUTPUT_VARIABLE NVCC_VERSION_OUTPUT + ERROR_VARIABLE NVCC_VERSION_ERROR + RESULT_VARIABLE NVCC_VERSION_RESULT +) +if(NOT ${NVCC_VERSION_RESULT} EQUAL 0) + message(FATAL_ERROR "execute nvcc --version failed: ${NVCC_VERSION_ERROR}") +endif() + +set(BUILD_ARGS --disable_large_strings --cmake-args="-DCUDA_STATIC_RUNTIME=OFF -DBUILD_SHARED_LIBS=OFF") +set(ENV{INSTALL_PREFIX} "${CURRENT_PACKAGES_DIR}") + +# build +vcpkg_execute_required_process( + COMMAND "${SOURCE_PATH}/build.sh" libcudf ${BUILD_ARGS} -n + WORKING_DIRECTORY "${SOURCE_PATH}" + LOGNAME "build-${TARGET_TRIPLET}" +) + +# patch for installation +vcpkg_execute_required_process( + COMMAND sed -i "230,231s/^/#/" "./cpp/build/_deps/rapids-cmake-src/rapids-cmake/cpm/nvcomp.cmake" + WORKING_DIRECTORY "${SOURCE_PATH}" + LOGNAME "build-${TARGET_TRIPLET}" +) +file(MAKE_DIRECTORY "${CURRENT_PACKAGES_DIR}/usr/lib64") + +# install +vcpkg_execute_required_process( + COMMAND "${SOURCE_PATH}/build.sh" libcudf ${BUILD_ARGS} + WORKING_DIRECTORY "${SOURCE_PATH}" + LOGNAME "build-${TARGET_TRIPLET}" +) + +# fix some file conflicts +file(RENAME "${CURRENT_PACKAGES_DIR}/include/zdict.h" "${CURRENT_PACKAGES_DIR}/include/cudf-zdict.h") +file(RENAME "${CURRENT_PACKAGES_DIR}/include/zstd.h" "${CURRENT_PACKAGES_DIR}/include/cudf-zstd.h") +file(RENAME "${CURRENT_PACKAGES_DIR}/include/zstd_errors.h" "${CURRENT_PACKAGES_DIR}/include/cudf-zstd_errors.h") +
diff --git a/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/vcpkg.json b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/vcpkg.json new file mode 100644 index 0000000..2115af5 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/bridge-cpp/vcpkg_ports/libcudf/vcpkg.json
@@ -0,0 +1,12 @@ +{ + "name": "libcudf", + "version": "25.6", + "description": "GPU DataFrame Library for Apache Arrow", + "homepage": "https://github.com/rapidsai/cudf", + "license": "Apache-2.0", + "dependencies": [ + "vcpkg-cmake", + "cuda" + ], + "supports": "linux | windows | osx" +}
diff --git a/native-engine/blaze-cudf-bridge/build.rs b/native-engine/blaze-cudf-bridge/build.rs new file mode 100644 index 0000000..e50b2d1 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/build.rs
@@ -0,0 +1,7 @@ +fn main() { + let lib_path = "native-engine/blaze-cudf-bridge/bridge-cpp/build"; + println!("cargo::rustc-link-search=native={lib_path}"); + println!("cargo::rustc-link-arg=-Wl,-rpath,$ORIGIN/{lib_path}"); + println!("cargo::rustc-link-lib=dylib=blaze-cudf-bridge"); + println!("cargo::rerun-if-changed={lib_path}"); +}
diff --git a/native-engine/blaze-cudf-bridge/src/exprs.rs b/native-engine/blaze-cudf-bridge/src/exprs.rs new file mode 100644 index 0000000..a6a7772 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/src/exprs.rs
@@ -0,0 +1,112 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{any::TypeId, ffi::CString, str::FromStr}; + +use num::ToPrimitive; + +use crate::{ffi::*, AsRawPtrAddr}; + +pub type CudfExprOperator = FFI_AstOperator; + +pub struct CudfExprBuilder { + expr_builder: FFI_ExprBuilderPtr, +} + +impl AsRawPtrAddr for CudfExprBuilder { + fn as_raw_ptr_addr(&self) -> usize { + self.expr_builder.as_raw_ptr_addr() + } +} + +impl CudfExprBuilder { + pub fn new() -> Self { + Self { + expr_builder: unsafe { ffi_new_expr_builder() }, + } + } + + pub fn add_literal(&self, scalar: &CudfScalar) -> usize { + unsafe { ffi_expr_builder_add_literal(self.as_raw_ptr_addr(), scalar.as_raw_ptr_addr()) } + } + + pub fn add_column_ref(&self, column_idx: usize) -> usize { + unsafe { ffi_expr_builder_add_column_reference(self.as_raw_ptr_addr(), column_idx) } + } + + pub fn add_column_name_ref(&self, name: &str) -> usize { + let name_c_str = CString::from_str(name).unwrap(); + unsafe { + ffi_expr_builder_add_column_name_reference(self.as_raw_ptr_addr(), name_c_str.as_ptr()) + } + } + + pub fn add_unary(&self, arg1_expr_id: usize, op: CudfExprOperator) -> usize { + unsafe { ffi_expr_builder_add_unary_expr(self.as_raw_ptr_addr(), arg1_expr_id, op) } + } + + pub fn add_binary( + &self, + arg1_expr_id: usize, + arg2_expr_id: usize, + op: CudfExprOperator, + ) -> usize { + unsafe { + ffi_expr_builder_add_binary_expr(self.as_raw_ptr_addr(), arg1_expr_id, arg2_expr_id, op) + } + } +} + +pub struct CudfScalar { + scalar: FFI_ScalarPtr, +} + +impl AsRawPtrAddr for CudfScalar { + fn as_raw_ptr_addr(&self) -> usize { + self.scalar.as_raw_ptr_addr() + } +} + +impl CudfScalar { + pub fn new_prim<T: Copy + ToPrimitive + 'static>(value: T) -> Self { + let typeid = TypeId::of::<T>(); + let scalar = unsafe { + match () { + _ if typeid == TypeId::of::<i8>() => ffi_i8_scalar(value.to_i8().unwrap()), + _ if typeid == TypeId::of::<i16>() => ffi_i16_scalar(value.to_i16().unwrap()), + _ if typeid == TypeId::of::<i32>() => ffi_i32_scalar(value.to_i32().unwrap()), + _ if typeid == TypeId::of::<i64>() => ffi_i64_scalar(value.to_i64().unwrap()), + _ if typeid == TypeId::of::<u8>() => ffi_u8_scalar(value.to_u8().unwrap()), + _ if typeid == TypeId::of::<u16>() => ffi_u16_scalar(value.to_u16().unwrap()), + _ if typeid == TypeId::of::<u32>() => ffi_u32_scalar(value.to_u32().unwrap()), + _ if typeid == TypeId::of::<u64>() => ffi_u64_scalar(value.to_u64().unwrap()), + _ if typeid == TypeId::of::<f32>() => ffi_f32_scalar(value.to_f32().unwrap()), + _ if typeid == TypeId::of::<f64>() => ffi_f64_scalar(value.to_f64().unwrap()), + _ => unreachable!("unimplemented fixed withd scalar"), + } + }; + Self { scalar } + } + + pub fn new_string(value: &str) -> Self { + let c_str = CString::from_str(value).unwrap(); + let scalar = unsafe { ffi_string_scalar(c_str.as_ptr()) }; + Self { scalar } + } + + pub fn new_bool(value: bool) -> Self { + let scalar = unsafe { ffi_i8_scalar(if value { 1 } else { 0 }) }; + Self { scalar } + } +}
diff --git a/native-engine/blaze-cudf-bridge/src/ffi.rs b/native-engine/blaze-cudf-bridge/src/ffi.rs new file mode 100644 index 0000000..6e06ef8 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/src/ffi.rs
@@ -0,0 +1,256 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; +use paste::paste; + +use crate::io::CudfDataSource; + +macro_rules! define_ffi_obj_wrapped_type { + ($tyname:ident) => { + #[repr(C, align(8))] + #[allow(non_camel_case_types)] + pub struct $tyname { + raw_ptr_addr: usize, + } + + impl $tyname { + #[allow(unused)] + pub fn as_raw_ptr_addr(&self) -> usize { + return self.raw_ptr_addr; + } + } + + impl Drop for $tyname { + fn drop(&mut self) { + unsafe { + paste! { + extern "C" { + fn [<ffi_ $tyname _delete>] (raw_ptr_addr: usize) -> (); + } + [<ffi_ $tyname _delete>] (self.raw_ptr_addr); + } + } + } + } + }; +} + +macro_rules! define_ffi_result_type { + ($tyname:ident, $t:ty) => { + #[repr(C, align(4))] + #[allow(non_camel_case_types)] + pub struct $tyname { + pub ok: bool, + pub value: $t, + } + + impl $tyname { + #[allow(unused)] + pub fn into_result(self, errmsg: String) -> arrow::error::Result<$t> { + if !self.ok { + return Err(arrow::error::ArrowError::ComputeError(errmsg)); + } + Ok(self.value) + } + } + }; +} + +define_ffi_obj_wrapped_type!(FFI_PlanPtr); +define_ffi_obj_wrapped_type!(FFI_TableStreamPtr); +define_ffi_obj_wrapped_type!(FFI_TablePtr); +define_ffi_obj_wrapped_type!(FFI_ScalarPtr); +define_ffi_obj_wrapped_type!(FFI_ExprBuilderPtr); + +define_ffi_result_type!(FFI_VoidResult, ()); +define_ffi_result_type!(FFI_BoolResult, bool); +define_ffi_result_type!(FFI_TableStreamPtrResult, FFI_TableStreamPtr); +define_ffi_result_type!(FFI_TableResult, FFI_TablePtr); +define_ffi_result_type!(FFI_USizeResult, usize); + +pub fn slice_to_const_ptr_and_len<T>(vec: &[T]) -> (*const T, usize) { + let len = vec.len(); + let ptr = vec.as_ptr(); + (ptr, len) +} + +pub fn slice_to_mut_ptr_and_len<T>(vec: &mut [T]) -> (*mut T, usize) { + let len = vec.len(); + let ptr = vec.as_mut_ptr(); + (ptr, len) +} + +#[allow(unused)] +extern "C" { + pub fn ffi_new_parquet_scan_plan( + data_source: *mut FFI_CudfDataSource, + predicate_filter_expr_ptr_addr: usize, + read_offset: usize, + read_size: usize, + schema: *mut FFI_ArrowSchema, + ) -> FFI_PlanPtr; + + pub fn ffi_new_split_table_plan(input_plan_ptr_addr: usize, batch_size: usize) -> FFI_PlanPtr; + + pub fn ffi_new_union_plan( + input_plan_ptr_addrs: *const usize, + input_plan_len: usize, + schema: *mut FFI_ArrowSchema, + ) -> FFI_PlanPtr; + + pub fn ffi_new_project_plan( + input_plan_ptr_addr: usize, + exprs_ptr_addrs: *const usize, + exprs_len: usize, + schema: *mut FFI_ArrowSchema, + ) -> FFI_PlanPtr; + + pub fn ffi_new_filter_plan(input_plan_ptr_addr: usize, expr_ptr_addr: usize) -> FFI_PlanPtr; + + pub fn ffi_execute_plan(plan_ptr_addr: usize) -> FFI_TableStreamPtrResult; + + pub fn ffi_table_stream_has_next(table_stream_ptr_addr: usize) -> FFI_BoolResult; + + pub fn ffi_table_stream_next(table_stream_ptr_addr: usize) -> FFI_TableResult; + + pub fn ffi_table_to_arrow( + table_ptr_addr: usize, + ffi_schema: *mut FFI_ArrowSchema, + ffi_array: *mut FFI_ArrowArray, + ) -> FFI_VoidResult; + + pub fn ffi_new_expr_builder() -> FFI_ExprBuilderPtr; + + pub fn ffi_i8_scalar(value: i8) -> FFI_ScalarPtr; + pub fn ffi_i16_scalar(value: i16) -> FFI_ScalarPtr; + pub fn ffi_i32_scalar(value: i32) -> FFI_ScalarPtr; + pub fn ffi_i64_scalar(value: i64) -> FFI_ScalarPtr; + pub fn ffi_u8_scalar(value: u8) -> FFI_ScalarPtr; + pub fn ffi_u16_scalar(value: u16) -> FFI_ScalarPtr; + pub fn ffi_u32_scalar(value: u32) -> FFI_ScalarPtr; + pub fn ffi_u64_scalar(value: u64) -> FFI_ScalarPtr; + pub fn ffi_f32_scalar(value: f32) -> FFI_ScalarPtr; + pub fn ffi_f64_scalar(value: f64) -> FFI_ScalarPtr; + pub fn ffi_string_scalar(value: *const std::ffi::c_char) -> FFI_ScalarPtr; + + pub fn ffi_expr_builder_add_literal( + expr_builder_ptr_addr: usize, + scalar_ptr_addr: usize, + ) -> usize; + pub fn ffi_expr_builder_add_column_reference( + expr_builder_ptr_addr: usize, + column_idx: usize, + ) -> usize; + pub fn ffi_expr_builder_add_column_name_reference( + expr_builder_ptr_addr: usize, + name: *const std::ffi::c_char, + ) -> usize; + pub fn ffi_expr_builder_add_unary_expr( + expr_builder_ptr_addr: usize, + arg1_expr_id: usize, + op: FFI_AstOperator, + ) -> usize; + pub fn ffi_expr_builder_add_binary_expr( + expr_builder_ptr_addr: usize, + arg1_expr_id: usize, + arg2_expr_id: usize, + op: FFI_AstOperator, + ) -> usize; +} + +#[repr(i32)] +#[allow(non_camel_case_types)] +#[allow(unused)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum FFI_AstOperator { + // this enum is copied from <cudf/ast/expressions.hpp> + // Binary operators + ADD = 0, // < operator + + SUB, // < operator - + MUL, // < operator * + DIV, // < operator / using common type of lhs and rhs + TRUE_DIV, // < operator / after promoting type to floating point + FLOOR_DIV, // < operator / after promoting to 64 bit floating point and then + // < flooring the result + MOD, // < operator % + PYMOD, // < operator % using Python's sign rules for negatives + POW, // < lhs ^ rhs + EQUAL, // < operator == + NULL_EQUAL, /* < operator == with Spark rules: NULL_EQUAL(null, null) is true, + * NULL_EQUAL(null, */ + // < valid) is false, and + // < NULL_EQUAL(valid, valid) == EQUAL(valid, valid) + NOT_EQUAL, // < operator != + LESS, // < operator < + GREATER, // < operator > + LESS_EQUAL, // < operator <= + GREATER_EQUAL, // < operator >= + BITWISE_AND, // < operator & + BITWISE_OR, // < operator | + BITWISE_XOR, // < operator ^ + LOGICAL_AND, // < operator && + NULL_LOGICAL_AND, // < operator && with Spark rules: NULL_LOGICAL_AND(null, null) is null, + // < NULL_LOGICAL_AND(null, true) is + // < null, NULL_LOGICAL_AND(null, false) is false, and NULL_LOGICAL_AND(valid, + // < valid) == LOGICAL_AND(valid, valid) + LOGICAL_OR, // < operator || + NULL_LOGICAL_OR, // < operator || with Spark rules: NULL_LOGICAL_OR(null, null) is null, + // < NULL_LOGICAL_OR(null, true) is true, + // < NULL_LOGICAL_OR(null, false) is null, and NULL_LOGICAL_OR(valid, valid) == + // < LOGICAL_OR(valid, valid) + // Unary operators + IDENTITY, // < Identity function + IS_NULL, // < Check if operand is null + SIN, // < Trigonometric sine + COS, // < Trigonometric cosine + TAN, // < Trigonometric tangent + ARCSIN, // < Trigonometric sine inverse + ARCCOS, // < Trigonometric cosine inverse + ARCTAN, // < Trigonometric tangent inverse + SINH, // < Hyperbolic sine + COSH, // < Hyperbolic cosine + TANH, // < Hyperbolic tangent + ARCSINH, // < Hyperbolic sine inverse + ARCCOSH, // < Hyperbolic cosine inverse + ARCTANH, // < Hyperbolic tangent inverse + EXP, // < Exponential (base e, Euler number) + LOG, // < Natural Logarithm (base e) + SQRT, // < Square-root (x^0.5) + CBRT, // < Cube-root (x^(1.0/3)) + CEIL, // < Smallest integer value not less than arg + FLOOR, // < largest integer value not greater than arg + ABS, // < Absolute value + RINT, // < Rounds the floating-point argument arg to an integer value + BIT_INVERT, // < Bitwise Not (~) + NOT, // < Logical Not (!) + CAST_TO_INT64, // < Cast value to int64_t + CAST_TO_UINT64, // < Cast value to uint64_t + CAST_TO_FLOAT64, // < Cast value to double +} + +#[repr(C, align(8))] +#[allow(non_camel_case_types)] +pub struct FFI_CudfDataSource { + pub dtor: extern "C" fn(this: *mut FFI_CudfDataSource) -> (), + pub data_size: extern "C" fn(this: *const FFI_CudfDataSource) -> usize, + pub read: extern "C" fn( + this: *mut FFI_CudfDataSource, + offset: usize, + buf: *mut u8, + buf_size: usize, + ) -> FFI_USizeResult, + + pub raw: *mut Box<dyn CudfDataSource>, +}
diff --git a/native-engine/blaze-cudf-bridge/src/io.rs b/native-engine/blaze-cudf-bridge/src/io.rs new file mode 100644 index 0000000..0dde996 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/src/io.rs
@@ -0,0 +1,115 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fs::File, + io::{Read, Seek}, +}; + +use arrow::error::Result; +use parquet::file::reader::Length; + +use crate::ffi::{FFI_CudfDataSource, FFI_USizeResult}; + +pub trait CudfDataSource { + fn data_size(&self) -> usize; + fn read(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize>; +} + +pub trait IntoFFICudfDataSource { + fn into_ffi(self) -> FFI_CudfDataSource; +} + +impl IntoFFICudfDataSource for Box<dyn CudfDataSource> { + fn into_ffi(self) -> FFI_CudfDataSource { + extern "C" fn dtor(this: *mut FFI_CudfDataSource) { + unsafe { + let this_mut = this.as_mut().expect("FFI_CudfDataSource is null"); + let raw_mut = this_mut + .raw + .as_mut() + .expect("FFI_CudfDataSource.raw is null"); + let _ = Box::from_raw(raw_mut as *mut Box<dyn CudfDataSource>); + } + } + + extern "C" fn data_size(this: *const FFI_CudfDataSource) -> usize { + unsafe { + let this_ref = this.as_ref().expect("FFI_CudfDataSource is null"); + let raw_ref = this_ref + .raw + .as_ref() + .expect("FFI_CudfDataSource.raw is null"); + raw_ref.data_size() + } + } + + extern "C" fn read( + this: *mut FFI_CudfDataSource, + offset: usize, + buf: *mut u8, + buf_size: usize, + ) -> FFI_USizeResult { + unsafe { + let this_mut = this.as_mut().expect("FFI_CudfDataSource is null"); + let raw_mut = this_mut + .raw + .as_mut() + .expect("FFI_CudfDataSource.raw is null"); + let buf_mut = std::slice::from_raw_parts_mut(buf, buf_size); + match raw_mut.read(offset, buf_mut) { + Ok(read_size) => FFI_USizeResult { + ok: true, + value: read_size, + }, + Err(_err) => FFI_USizeResult { + ok: false, + value: 0, + }, + } + } + } + + FFI_CudfDataSource { + dtor, + data_size, + read, + raw: Box::into_raw(Box::new(self)), + } + } +} + +pub struct CudfLocalFileDataSource { + file_path: String, + file: File, +} + +impl CudfLocalFileDataSource { + pub fn try_new(file_path: String) -> Result<Self> { + let file = File::open(&file_path)?; + Ok(Self { file_path, file }) + } +} + +impl CudfDataSource for CudfLocalFileDataSource { + fn data_size(&self) -> usize { + self.file.len() as usize + } + + fn read(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize> { + self.file.seek(std::io::SeekFrom::Start(offset as u64))?; + let read_size = self.file.read(buf)?; + Ok(read_size) + } +}
diff --git a/native-engine/blaze-cudf-bridge/src/lib.rs b/native-engine/blaze-cudf-bridge/src/lib.rs new file mode 100644 index 0000000..28f4812 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/src/lib.rs
@@ -0,0 +1,25 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod exprs; +pub mod io; +pub mod plans; +pub mod tables; + +#[allow(unused)] +mod ffi; + +pub trait AsRawPtrAddr { + fn as_raw_ptr_addr(&self) -> usize; +}
diff --git a/native-engine/blaze-cudf-bridge/src/plans/filter.rs b/native-engine/blaze-cudf-bridge/src/plans/filter.rs new file mode 100644 index 0000000..7653dd1 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/src/plans/filter.rs
@@ -0,0 +1,39 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow::error::Result; + +use crate::{exprs::CudfExprBuilder, ffi::*, plans::CudfPlan, AsRawPtrAddr}; + +pub struct CudfFilterPlan { + plan: FFI_PlanPtr, +} + +impl AsRawPtrAddr for CudfFilterPlan { + fn as_raw_ptr_addr(&self) -> usize { + self.plan.as_raw_ptr_addr() + } +} + +impl CudfPlan for CudfFilterPlan {} + +impl CudfFilterPlan { + pub fn try_new(input_plan: Arc<dyn CudfPlan>, expr: &CudfExprBuilder) -> Result<Self> { + let plan = + unsafe { ffi_new_filter_plan(input_plan.as_raw_ptr_addr(), expr.as_raw_ptr_addr()) }; + Ok(Self { plan }) + } +}
diff --git a/native-engine/blaze-cudf-bridge/src/plans/mod.rs b/native-engine/blaze-cudf-bridge/src/plans/mod.rs new file mode 100644 index 0000000..175ebc8 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/src/plans/mod.rs
@@ -0,0 +1,36 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow::error::Result; + +use crate::{ffi::*, tables::CudfTableStream, AsRawPtrAddr}; + +pub mod filter; +pub mod parquet_scan; +pub mod project; +pub mod split_table; +pub mod union; + +pub type CudfPlanRef = Arc<dyn CudfPlan>; + +pub trait CudfPlan: AsRawPtrAddr + Send + Sync + 'static { + fn execute(&self) -> Result<CudfTableStream> { + let raw_table_stream_result = unsafe { ffi_execute_plan(self.as_raw_ptr_addr()) }; + let raw_table_stream = + raw_table_stream_result.into_result(format!("error execute plan"))?; + Ok(CudfTableStream::from_raw_table_stream(raw_table_stream)) + } +}
diff --git a/native-engine/blaze-cudf-bridge/src/plans/parquet_scan.rs b/native-engine/blaze-cudf-bridge/src/plans/parquet_scan.rs new file mode 100644 index 0000000..b52985f --- /dev/null +++ b/native-engine/blaze-cudf-bridge/src/plans/parquet_scan.rs
@@ -0,0 +1,62 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::{ + datatypes::{DataType, SchemaRef}, + error::Result, + ffi::FFI_ArrowSchema, +}; + +use crate::{ + exprs::CudfExprBuilder, + ffi::*, + io::{CudfDataSource, IntoFFICudfDataSource}, + plans::CudfPlan, + AsRawPtrAddr, +}; + +pub struct CudfParquetScanPlan { + plan: FFI_PlanPtr, +} + +impl AsRawPtrAddr for CudfParquetScanPlan { + fn as_raw_ptr_addr(&self) -> usize { + self.plan.as_raw_ptr_addr() + } +} + +impl CudfPlan for CudfParquetScanPlan {} + +impl CudfParquetScanPlan { + pub fn try_new( + data_source: Box<dyn CudfDataSource>, + predicate_filter_expr: &CudfExprBuilder, + read_offset: usize, + read_size: usize, + schema: SchemaRef, + ) -> Result<Self> { + let schema_type = DataType::Struct(schema.fields().clone()); + let mut ffi_schema = FFI_ArrowSchema::try_from(schema_type)?; + let plan = unsafe { + ffi_new_parquet_scan_plan( + &mut data_source.into_ffi(), + predicate_filter_expr.as_raw_ptr_addr(), + read_offset, + read_size, + &mut ffi_schema, + ) + }; + Ok(Self { plan }) + } +}
diff --git a/native-engine/blaze-cudf-bridge/src/plans/project.rs b/native-engine/blaze-cudf-bridge/src/plans/project.rs new file mode 100644 index 0000000..a6ded26 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/src/plans/project.rs
@@ -0,0 +1,62 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow::{ + datatypes::{DataType, SchemaRef}, + error::Result, + ffi::FFI_ArrowSchema, +}; + +use crate::{exprs::CudfExprBuilder, ffi::*, plans::CudfPlan, AsRawPtrAddr}; + +pub struct CudfProjectPlan { + plan: FFI_PlanPtr, +} + +impl AsRawPtrAddr for CudfProjectPlan { + fn as_raw_ptr_addr(&self) -> usize { + self.plan.as_raw_ptr_addr() + } +} + +impl CudfPlan for CudfProjectPlan {} + +impl CudfProjectPlan { + pub fn try_new( + input_plan: Arc<dyn CudfPlan>, + exprs: &[&CudfExprBuilder], + schema: SchemaRef, + ) -> Result<Self> { + let exprs_ptr_addrs = exprs + .iter() + .map(|expr| expr.as_raw_ptr_addr()) + .collect::<Vec<_>>(); + let (exprs_ptr, exprs_len) = slice_to_const_ptr_and_len(&exprs_ptr_addrs); + + let schema_type = DataType::Struct(schema.fields().clone()); + let mut ffi_schema = FFI_ArrowSchema::try_from(schema_type)?; + + let plan = unsafe { + ffi_new_project_plan( + input_plan.as_raw_ptr_addr(), + exprs_ptr, + exprs_len, + &mut ffi_schema, + ) + }; + Ok(Self { plan }) + } +}
diff --git a/native-engine/blaze-cudf-bridge/src/plans/split_table.rs b/native-engine/blaze-cudf-bridge/src/plans/split_table.rs new file mode 100644 index 0000000..65042a9 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/src/plans/split_table.rs
@@ -0,0 +1,36 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use crate::{ffi::*, plans::CudfPlan, AsRawPtrAddr}; + +pub struct CudfSplitTablePlan { + plan: FFI_PlanPtr, +} + +impl AsRawPtrAddr for CudfSplitTablePlan { + fn as_raw_ptr_addr(&self) -> usize { + self.plan.as_raw_ptr_addr() + } +} + +impl CudfPlan for CudfSplitTablePlan {} + +impl CudfSplitTablePlan { + pub fn new(input_plan: Arc<dyn CudfPlan>, batch_size: usize) -> Self { + let plan = unsafe { ffi_new_split_table_plan(input_plan.as_raw_ptr_addr(), batch_size) }; + Self { plan } + } +}
diff --git a/native-engine/blaze-cudf-bridge/src/tables.rs b/native-engine/blaze-cudf-bridge/src/tables.rs new file mode 100644 index 0000000..14abf91 --- /dev/null +++ b/native-engine/blaze-cudf-bridge/src/tables.rs
@@ -0,0 +1,93 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::{ + array::{make_array, AsArray, RecordBatch}, + datatypes::{DataType, SchemaRef}, + error::Result, + ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema}, +}; + +use crate::{ffi::*, AsRawPtrAddr}; + +pub struct CudfTableStream { + table_stream: FFI_TableStreamPtr, +} + +impl AsRawPtrAddr for CudfTableStream { + fn as_raw_ptr_addr(&self) -> usize { + self.table_stream.as_raw_ptr_addr() + } +} + +impl CudfTableStream { + pub fn from_raw_table_stream(table_stream: FFI_TableStreamPtr) -> Self { + Self { table_stream } + } + + fn next_impl(&mut self) -> Result<Option<CudfTable>> { + let has_next = unsafe { + ffi_table_stream_has_next(self.as_raw_ptr_addr()) + .into_result(format!("error invoking table_stream.has_next()"))? + }; + if has_next { + let raw_table = unsafe { + ffi_table_stream_next(self.as_raw_ptr_addr()) + .into_result(format!("error invoking table_stream.next()"))? + }; + return Ok(Some(CudfTable::from_raw_table(raw_table))); + } + Ok(None) + } +} + +impl Iterator for CudfTableStream { + type Item = Result<CudfTable>; + + fn next(&mut self) -> Option<Self::Item> { + self.next_impl().transpose() + } +} + +pub struct CudfTable { + table: FFI_TablePtr, +} + +impl AsRawPtrAddr for CudfTable { + fn as_raw_ptr_addr(&self) -> usize { + self.table.as_raw_ptr_addr() + } +} + +impl CudfTable { + pub fn from_raw_table(table: FFI_TablePtr) -> Self { + Self { table } + } + + pub fn to_arrow_record_batch(&self, schema: SchemaRef) -> Result<RecordBatch> { + let schema_type = DataType::Struct(schema.fields().clone()); + let mut ffi_schema = FFI_ArrowSchema::try_from(schema_type.clone())?; + let mut ffi_array = FFI_ArrowArray::empty(); + let array_data = unsafe { + ffi_table_to_arrow(self.as_raw_ptr_addr(), &mut ffi_schema, &mut ffi_array) + .into_result(format!( + "error converting cudf::table to arrow record batch" + ))?; + from_ffi_and_data_type(ffi_array, schema_type)? + }; + Ok(RecordBatch::from( + make_array(array_data).as_struct().clone(), + )) + } +}
diff --git a/native-engine/blaze-jni-bridge/src/conf.rs b/native-engine/blaze-jni-bridge/src/conf.rs index a00a18e..e695d14 100644 --- a/native-engine/blaze-jni-bridge/src/conf.rs +++ b/native-engine/blaze-jni-bridge/src/conf.rs
@@ -51,6 +51,7 @@ define_conf!(IntConf, SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE); define_conf!(BooleanConf, ORC_FORCE_POSITIONAL_EVOLUTION); define_conf!(IntConf, UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG); +define_conf!(BooleanConf, ENABLE_CUDA); pub trait BooleanConf { fn key(&self) -> &'static str;
diff --git a/native-engine/datafusion-ext-commons/src/hadoop_fs.rs b/native-engine/datafusion-ext-commons/src/hadoop_fs.rs index fe6353a..5401ac0 100644 --- a/native-engine/datafusion-ext-commons/src/hadoop_fs.rs +++ b/native-engine/datafusion-ext-commons/src/hadoop_fs.rs
@@ -39,6 +39,8 @@ pub fn mkdirs(&self, path: &str) -> Result<()> { let _timer = self.io_time.timer(); + log::info!("hdfs mkdirs: {path}"); + let path_str = jni_new_string!(path)?; let path_uri = jni_new_object!(JavaURI(path_str.as_obj()))?; let path = jni_new_object!(HadoopPath(path_uri.as_obj()))?; @@ -53,6 +55,8 @@ pub fn open(&self, path: &str) -> Result<Arc<FsDataInputWrapper>> { let _timer = self.io_time.timer(); + log::info!("hdfs open: {path}"); + let path = jni_new_string!(path)?; let wrapper = jni_call_static!( JniBridge.openFileAsDataInputWrapper(self.fs.as_obj(), path.as_obj()) -> JObject @@ -66,6 +70,8 @@ pub fn create(&self, path: &str) -> Result<Arc<FsDataOutputWrapper>> { let _timer = self.io_time.timer(); + log::info!("hdfs create: {path}"); + let path = jni_new_string!(path)?; let wrapper = jni_call_static!( JniBridge.createFileAsDataOutputWrapper(self.fs.as_obj(), path.as_obj()) -> JObject
diff --git a/native-engine/datafusion-ext-plans/Cargo.toml b/native-engine/datafusion-ext-plans/Cargo.toml index 8ae69f5..b65fb91 100644 --- a/native-engine/datafusion-ext-plans/Cargo.toml +++ b/native-engine/datafusion-ext-plans/Cargo.toml
@@ -11,6 +11,7 @@ arrow = { workspace = true } arrow-schema = { workspace = true } blaze-jni-bridge = { workspace = true } +blaze-cudf-bridge = { workspace = true } datafusion = { workspace = true } datafusion-ext-commons = { workspace = true } datafusion-ext-exprs = { workspace = true }
diff --git a/native-engine/datafusion-ext-plans/src/common/execution_context.rs b/native-engine/datafusion-ext-plans/src/common/execution_context.rs index ccb6076..772b130 100644 --- a/native-engine/datafusion-ext-plans/src/common/execution_context.rs +++ b/native-engine/datafusion-ext-plans/src/common/execution_context.rs
@@ -22,7 +22,11 @@ }; use arrow::{array::RecordBatch, datatypes::SchemaRef}; -use blaze_jni_bridge::{conf, conf::BooleanConf, is_task_running}; +use blaze_cudf_bridge::plans::{split_table::CudfSplitTablePlan, CudfPlan}; +use blaze_jni_bridge::{ + conf::{self, BooleanConf}, + is_task_running, +}; use datafusion::{ common::Result, execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}, @@ -44,6 +48,7 @@ use crate::{ common::{column_pruning::ExecuteWithColumnPruning, timer_helper::TimerHelper}, + cudf::plan::convert_datafusion_plan_to_cudf, memmgr::metrics::SpillMetrics, }; @@ -240,6 +245,49 @@ input.execute_projected(self.partition_id, self.task_ctx.clone(), projection) } + pub fn execute_with_cudf( + self: &Arc<Self>, + plan: &dyn ExecutionPlan, + ) -> Result<SendableRecordBatchStream> { + if !conf::ENABLE_CUDA.value().unwrap_or(false) { + return df_execution_err!("blaze CUDA support is not enabled"); + } + let exec_ctx = self.clone(); + let cudf_elapsed_compute = exec_ctx.register_timer_metric("cudf_elapsed_compute"); + let cudf_plan = convert_datafusion_plan_to_cudf(plan).inspect_err(|err| { + log::info!("convert plan to cudf-bridge error: {err}"); + })?; + + Ok(exec_ctx + .clone() + .output_with_sender("CudfStream", move |sender| async move { + let _cudf_timer = cudf_elapsed_compute.timer(); + sender.exclude_time(&cudf_elapsed_compute); + + log::info!("****** executing with Blaze + CUDA (libcudf) ******"); + + let cudf_plan: Arc<dyn CudfPlan> = + Arc::new(CudfSplitTablePlan::new(cudf_plan, batch_size())); + let mut cudf_table_stream = Box::pin(cudf_plan.execute().inspect_err(|err| { + log::info!("executing cudf-bridge plan error: {err}"); + })?); + + while let Some(batch) = { + let output_schema = exec_ctx.output_schema(); + tokio::task::block_in_place(|| -> Result<Option<RecordBatch>> { + if let Some(cudf_table) = cudf_table_stream.as_mut().next().transpose()? { + return Ok(Some(cudf_table.to_arrow_record_batch(output_schema)?)); + } + Ok(None) + })? + } { + exec_ctx.baseline_metrics().record_output(batch.num_rows()); + sender.send(batch).await; + } + Ok(()) + })) + } + pub fn stat_input( self: &Arc<Self>, input: SendableRecordBatchStream,
diff --git a/native-engine/datafusion-ext-plans/src/cudf/expr.rs b/native-engine/datafusion-ext-plans/src/cudf/expr.rs new file mode 100644 index 0000000..3293b17 --- /dev/null +++ b/native-engine/datafusion-ext-plans/src/cudf/expr.rs
@@ -0,0 +1,99 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use blaze_cudf_bridge::exprs::{CudfExprBuilder, CudfExprOperator, CudfScalar}; +use datafusion::{ + error::Result, + logical_expr::Operator, + physical_expr::PhysicalExprRef, + physical_plan::expressions::{ + BinaryExpr, Column, IsNotNullExpr, IsNullExpr, Literal, NegativeExpr, + }, + scalar::ScalarValue, +}; +use datafusion_ext_commons::{df_unimplemented_err, downcast_any}; + +pub fn convert_datafusion_expr_to_cudf( + de: &PhysicalExprRef, + use_column_name: bool, +) -> Result<CudfExprBuilder> { + let root_expr_builder = CudfExprBuilder::new(); + let _ = convert_impl(&de, &root_expr_builder, use_column_name)?; + Ok(root_expr_builder) +} + +fn convert_impl(de: &PhysicalExprRef, b: &CudfExprBuilder, use_column_name: bool) -> Result<usize> { + if let Ok(lit) = downcast_any!(de, Literal) { + let cudf_scalar = match lit.value() { + ScalarValue::Boolean(Some(v)) => CudfScalar::new_bool(*v), + ScalarValue::Int8(Some(v)) => CudfScalar::new_prim(*v), + ScalarValue::Int16(Some(v)) => CudfScalar::new_prim(*v), + ScalarValue::Int32(Some(v)) => CudfScalar::new_prim(*v), + ScalarValue::Int64(Some(v)) => CudfScalar::new_prim(*v), + ScalarValue::UInt8(Some(v)) => CudfScalar::new_prim(*v), + ScalarValue::UInt16(Some(v)) => CudfScalar::new_prim(*v), + ScalarValue::UInt32(Some(v)) => CudfScalar::new_prim(*v), + ScalarValue::UInt64(Some(v)) => CudfScalar::new_prim(*v), + ScalarValue::Float32(Some(v)) => CudfScalar::new_prim(*v), + ScalarValue::Float64(Some(v)) => CudfScalar::new_prim(*v), + ScalarValue::Utf8(Some(v)) => CudfScalar::new_string(v), + _ => return df_unimplemented_err!("cannot convert scalar to cudf: {lit:?}"), + }; + return Ok(b.add_literal(&cudf_scalar)); + } else if let Ok(col) = downcast_any!(de, Column) { + if use_column_name { + return Ok(b.add_column_name_ref(col.name())); + } else { + return Ok(b.add_column_ref(col.index())); + } + } else if let Ok(neg) = downcast_any!(de, NegativeExpr) { + let arg_expr_id = convert_impl(neg.arg(), b, use_column_name)?; + return Ok(b.add_unary(arg_expr_id, CudfExprOperator::SUB)); + } else if let Ok(binary) = downcast_any!(de, BinaryExpr) { + let left_expr_id = convert_impl(binary.left(), b, use_column_name)?; + let right_expr_id = convert_impl(binary.right(), b, use_column_name)?; + let op = convert_op(binary.op())?; + return Ok(b.add_binary(left_expr_id, right_expr_id, op)); + } else if let Ok(is_null) = downcast_any!(de, IsNullExpr) { + let arg_expr_id = convert_impl(is_null.arg(), b, use_column_name)?; + return Ok(b.add_unary(arg_expr_id, CudfExprOperator::IS_NULL)); + } else if let Ok(is_not_null) = downcast_any!(de, IsNotNullExpr) { + let arg_expr_id = convert_impl(is_not_null.arg(), b, use_column_name)?; + let is_null_expr_id = b.add_unary(arg_expr_id, CudfExprOperator::IS_NULL); + return Ok(b.add_unary(is_null_expr_id, CudfExprOperator::NOT)); + } + df_unimplemented_err!("cannot convert expr to cudf: {de:?}") +} + +fn convert_op(op: &Operator) -> Result<CudfExprOperator> { + Ok(match op { + Operator::Eq => CudfExprOperator::EQUAL, + Operator::NotEq => CudfExprOperator::NOT_EQUAL, + Operator::Lt => CudfExprOperator::LESS, + Operator::LtEq => CudfExprOperator::LESS_EQUAL, + Operator::Gt => CudfExprOperator::GREATER, + Operator::GtEq => CudfExprOperator::GREATER_EQUAL, + Operator::Plus => CudfExprOperator::ADD, + Operator::Minus => CudfExprOperator::SUB, + Operator::Multiply => CudfExprOperator::MUL, + Operator::Divide => CudfExprOperator::DIV, + Operator::Modulo => CudfExprOperator::MOD, + Operator::And => CudfExprOperator::LOGICAL_AND, + Operator::Or => CudfExprOperator::LOGICAL_OR, + Operator::BitwiseAnd => CudfExprOperator::BITWISE_AND, + Operator::BitwiseOr => CudfExprOperator::BITWISE_OR, + Operator::BitwiseXor => CudfExprOperator::BITWISE_XOR, + other => return df_unimplemented_err!("connot convert operator to cudf: {other:?}"), + }) +}
diff --git a/native-engine/datafusion-ext-plans/src/cudf/mod.rs b/native-engine/datafusion-ext-plans/src/cudf/mod.rs new file mode 100644 index 0000000..048bcb3 --- /dev/null +++ b/native-engine/datafusion-ext-plans/src/cudf/mod.rs
@@ -0,0 +1,16 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod expr; +pub mod plan;
diff --git a/native-engine/datafusion-ext-plans/src/cudf/plan.rs b/native-engine/datafusion-ext-plans/src/cudf/plan.rs new file mode 100644 index 0000000..d015119 --- /dev/null +++ b/native-engine/datafusion-ext-plans/src/cudf/plan.rs
@@ -0,0 +1,209 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use blaze_cudf_bridge::{ + io::CudfDataSource, + plans::{ + filter::CudfFilterPlan, parquet_scan::CudfParquetScanPlan, project::CudfProjectPlan, + union::CudfUnionPlan, CudfPlan, CudfPlanRef, + }, +}; +use datafusion::{ + datasource::listing::FileRange, + error::Result, + logical_expr::Operator, + physical_expr::PhysicalExprRef, + physical_plan::{ + expressions::{lit, BinaryExpr, Column, Literal, SCAndExpr}, + metrics::Time, + ExecutionPlan, + }, +}; +use datafusion_ext_commons::{df_unimplemented_err, downcast_any}; + +use super::expr::convert_datafusion_expr_to_cudf; +use crate::{ + filter_exec::FilterExec, parquet_exec::ParquetExec, project_exec::ProjectExec, + rename_columns_exec::RenameColumnsExec, scan::internal_file_reader::InternalFileReader, +}; + +pub fn convert_datafusion_plan_to_cudf(exec: &dyn ExecutionPlan) -> Result<CudfPlanRef> { + if let Ok(exec) = downcast_any!(exec, ParquetExec) { + return convert_parquet_scan(exec); + } + if let Ok(exec) = downcast_any!(exec, RenameColumnsExec) { + return convert_rename_columns(exec); + } + if let Ok(exec) = downcast_any!(exec, ProjectExec) { + return convert_project(exec); + } + if let Ok(exec) = downcast_any!(exec, FilterExec) { + return convert_filter(exec); + } + df_unimplemented_err!("cudf not supported for plan: {exec:?}") +} + +fn convert_parquet_scan(exec: &ParquetExec) -> Result<CudfPlanRef> { + let io_time = Time::new(); + let config = exec.file_scan_config(); + let fs_provider = exec.get_fs_provider(&io_time)?; + let mut inputs = vec![]; + + for part_file in config.file_groups.iter().flatten() { + assert!( + part_file.partition_values.is_empty(), + "XXX partition tables are not yet supported: {:?}", + part_file.partition_values, + ); + let range = part_file.range.clone().unwrap_or(FileRange { + start: 0, + end: i64::MAX, + }); + let object_meta = part_file.object_meta.clone(); + inputs.push((object_meta, range)); + } + + struct InternalFileDataSource { + file_reader: InternalFileReader, + file_size: usize, + } + impl CudfDataSource for InternalFileDataSource { + fn data_size(&self) -> usize { + self.file_size + } + + fn read(&mut self, offset: usize, buf: &mut [u8]) -> arrow::error::Result<usize> { + let range = offset..offset + buf.len(); + self.file_reader.read_fully_into_buffer(range, buf)?; + Ok(buf.len()) + } + } + + let schema = exec.schema(); + + // cudf only supports <column cmp literal> filters + let predicate_exprs = split_expr_by_logical_ands(exec.predicate_expr().unwrap_or(lit(true))) + .into_iter() + .filter(|expr| { + matches!( + downcast_any!(&expr, BinaryExpr), + Ok(expr) if + downcast_any!(expr.left(), Column).is_ok() && + downcast_any!(expr.right(), Literal).is_ok()) + }) + .collect::<Vec<_>>(); + let predicate_expr = join_exprs_by_logical_ands(&predicate_exprs); + let cudf_predicate_expr = convert_datafusion_expr_to_cudf(&predicate_expr, true)?; + + let mut scan_plans: Vec<Arc<dyn CudfPlan>> = vec![]; + for input in inputs { + let data_source = Box::new(InternalFileDataSource { + file_size: input.0.size, + file_reader: InternalFileReader::try_new(fs_provider.clone(), input.0.clone())?, + }); + let scan_plan = Arc::new(CudfParquetScanPlan::try_new( + data_source, + &cudf_predicate_expr, + input.1.start as usize, + input.1.end as usize - input.1.start as usize, + schema.clone(), + )?); + scan_plans.push(scan_plan); + } + + if scan_plans.len() == 1 { + return Ok(scan_plans[0].clone()); + } + Ok(Arc::new(CudfUnionPlan::try_new(&scan_plans, schema)?)) +} + +fn convert_rename_columns(exec: &RenameColumnsExec) -> Result<CudfPlanRef> { + let output_schema = exec.schema(); + let input_schema = exec.children()[0].schema(); + + if output_schema.fields().len() != input_schema.fields.len() + || output_schema + .fields() + .iter() + .zip(input_schema.fields()) + .all(|(f1, f2)| f1.name() == f2.name() && f1.is_nullable() == f2.is_nullable()) + { + return df_unimplemented_err!( + "cudf-bridge only supports RenameColumnsExec with same data types" + ); + } + + let input = convert_datafusion_plan_to_cudf(exec.children()[0].as_ref())?; + let cudf_project_exprs = (0..output_schema.fields().len()) + .map(|i| { + let col: PhysicalExprRef = Arc::new(Column::new("", i)); + Ok(convert_datafusion_expr_to_cudf(&col, false)?) + }) + .collect::<Result<Vec<_>>>()?; + Ok(Arc::new(CudfProjectPlan::try_new( + input, + &cudf_project_exprs.iter().collect::<Vec<_>>(), + output_schema, + )?)) +} + +fn convert_project(exec: &ProjectExec) -> Result<CudfPlanRef> { + let schema = exec.schema(); + let cudf_exprs = exec + .named_exprs() + .iter() + .map(|(expr, _name)| convert_datafusion_expr_to_cudf(expr, false)) + .collect::<Result<Vec<_>>>()?; + let cudf_input = convert_datafusion_plan_to_cudf(exec.children()[0].as_ref())?; + Ok(Arc::new(CudfProjectPlan::try_new( + cudf_input, + &cudf_exprs.iter().collect::<Vec<_>>(), + schema, + )?)) +} + +fn convert_filter(exec: &FilterExec) -> Result<CudfPlanRef> { + let predicate_expr = join_exprs_by_logical_ands(exec.predicates()); + let cudf_predicate_expr = convert_datafusion_expr_to_cudf(&predicate_expr, false)?; + let cudf_input = convert_datafusion_plan_to_cudf(exec.children()[0].as_ref())?; + Ok(Arc::new(CudfFilterPlan::try_new( + cudf_input, + &cudf_predicate_expr, + )?)) +} + +fn split_expr_by_logical_ands(expr: PhysicalExprRef) -> Vec<PhysicalExprRef> { + if let Ok(expr) = downcast_any!(&expr, SCAndExpr) { + let splitted_left = split_expr_by_logical_ands(expr.left.clone()); + let splitted_right = split_expr_by_logical_ands(expr.right.clone()); + return [splitted_left, splitted_right].concat(); + } + if let Ok(expr) = downcast_any!(&expr, BinaryExpr) { + if expr.op() == &Operator::And { + let splitted_left = split_expr_by_logical_ands(expr.left().clone()); + let splitted_right = split_expr_by_logical_ands(expr.right().clone()); + return [splitted_left, splitted_right].concat(); + } + } + vec![expr] +} + +fn join_exprs_by_logical_ands(expr: &[PhysicalExprRef]) -> PhysicalExprRef { + expr.iter() + .cloned() + .reduce(|pred1, pred2| Arc::new(BinaryExpr::new(pred1, Operator::And, pred2))) + .unwrap_or(lit(true)) +}
diff --git a/native-engine/datafusion-ext-plans/src/filter_exec.rs b/native-engine/datafusion-ext-plans/src/filter_exec.rs index 20cdb4d..18c7f50 100644 --- a/native-engine/datafusion-ext-plans/src/filter_exec.rs +++ b/native-engine/datafusion-ext-plans/src/filter_exec.rs
@@ -127,8 +127,12 @@ partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream> { - let predicates = self.predicates.clone(); let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); + if let Ok(stream) = exec_ctx.execute_with_cudf(self) { + return Ok(stream); + } + + let predicates = self.predicates.clone(); let input = exec_ctx.execute_with_input_stats(&self.input)?; let filtered = execute_filter(input, predicates, exec_ctx.clone())?; Ok(exec_ctx.coalesce_with_default_batch_size(filtered))
diff --git a/native-engine/datafusion-ext-plans/src/lib.rs b/native-engine/datafusion-ext-plans/src/lib.rs index 505b1d1..8e56ce8 100644 --- a/native-engine/datafusion-ext-plans/src/lib.rs +++ b/native-engine/datafusion-ext-plans/src/lib.rs
@@ -54,6 +54,9 @@ pub mod common; pub mod generate; pub mod joins; -mod scan; +pub mod scan; pub mod shuffle; pub mod window; + +// cudf integration +mod cudf;
diff --git a/native-engine/datafusion-ext-plans/src/parquet_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_exec.rs index 26864ee..a4e845a 100644 --- a/native-engine/datafusion-ext-plans/src/parquet_exec.rs +++ b/native-engine/datafusion-ext-plans/src/parquet_exec.rs
@@ -40,7 +40,7 @@ physical_expr::EquivalenceProperties, physical_optimizer::pruning::PruningPredicate, physical_plan::{ - metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, + metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time}, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PhysicalExpr, PlanProperties, SendableRecordBatchStream, Statistics, }, @@ -59,9 +59,10 @@ }; /// Execution plan for scanning one or more Parquet partitions -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct ParquetExec { fs_resource_id: String, + fs_provider: OnceCell<Arc<FsProvider>>, base_config: FileScanConfig, projected_statistics: Statistics, projected_schema: SchemaRef, @@ -72,6 +73,22 @@ props: OnceCell<PlanProperties>, } +impl std::fmt::Debug for ParquetExec { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetExec") + .field("fs_resource_id", &self.fs_resource_id) + .field("base_config", &self.base_config) + .field("projected_statistics", &self.projected_statistics) + .field("projected_schema", &self.projected_schema) + .field("metrics", &self.metrics) + .field("predicate", &self.predicate) + .field("pruning_predicate", &self.pruning_predicate) + .field("page_pruning_predicate", &self.page_pruning_predicate) + .field("props", &self.props) + .finish() + } +} + impl ParquetExec { /// Create a new Parquet reader execution plan provided file list and /// schema. @@ -108,6 +125,7 @@ Self { fs_resource_id, + fs_provider: OnceCell::new(), base_config, projected_schema, projected_statistics, @@ -118,6 +136,25 @@ props: OnceCell::new(), } } + + pub fn get_fs_provider(&self, io_time: &Time) -> Result<Arc<FsProvider>> { + // get fs object from jni bridge resource + let fs_provider = self.fs_provider.get_or_try_init(|| { + let resource_id = jni_new_string!(&self.fs_resource_id)?; + let fs = jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?; + let fs_provider = Arc::new(FsProvider::new(jni_new_global_ref!(fs.as_obj())?, io_time)); + Ok::<_, DataFusionError>(fs_provider) + })?; + Ok(fs_provider.clone()) + } + + pub fn file_scan_config(&self) -> &FileScanConfig { + &self.base_config + } + + pub fn predicate_expr(&self) -> Option<Arc<dyn PhysicalExpr>> { + self.predicate.clone() + } } impl DisplayAs for ParquetExec { @@ -184,15 +221,14 @@ context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream> { let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); + if let Ok(stream) = exec_ctx.execute_with_cudf(self) { + return Ok(stream); + } + let elapsed_compute = exec_ctx.baseline_metrics().elapsed_compute().clone(); let _timer = elapsed_compute.timer(); let io_time = exec_ctx.register_timer_metric("io_time"); - // get fs object from jni bridge resource - let resource_id = jni_new_string!(&self.fs_resource_id)?; - let fs = jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?; - let fs_provider = Arc::new(FsProvider::new(jni_new_global_ref!(fs.as_obj())?, &io_time)); - let schema_adapter_factory = Arc::new(BlazeSchemaAdapterFactory); let projection = match self.base_config.file_column_projection_indices() { Some(proj) => proj, @@ -213,7 +249,9 @@ table_schema: self.base_config.file_schema.clone(), metadata_size_hint: None, metrics: self.metrics.clone(), - parquet_file_reader_factory: Arc::new(FsReaderFactory::new(fs_provider)), + parquet_file_reader_factory: Arc::new(FsReaderFactory::new( + self.get_fs_provider(&io_time)?, + )), pushdown_filters: page_filtering_enabled, reorder_filters: page_filtering_enabled, enable_page_index: page_filtering_enabled,
diff --git a/native-engine/datafusion-ext-plans/src/project_exec.rs b/native-engine/datafusion-ext-plans/src/project_exec.rs index 86e96a2..4098b07 100644 --- a/native-engine/datafusion-ext-plans/src/project_exec.rs +++ b/native-engine/datafusion-ext-plans/src/project_exec.rs
@@ -78,6 +78,10 @@ props: OnceCell::new(), }) } + + pub fn named_exprs(&self) -> &[(PhysicalExprRef, String)] { + &self.expr + } } impl DisplayAs for ProjectExec { @@ -136,8 +140,11 @@ context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream> { let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); - let exprs: Vec<PhysicalExprRef> = self.expr.iter().map(|(e, _name)| e.clone()).collect(); + if let Ok(stream) = exec_ctx.execute_with_cudf(self) { + return Ok(stream); + } + let exprs: Vec<PhysicalExprRef> = self.expr.iter().map(|(e, _name)| e.clone()).collect(); let output = if let Ok(filter_exec) = downcast_any!(self.input, FilterExec) { execute_project_with_filtering( filter_exec.children()[0].clone(),
diff --git a/native-engine/datafusion-ext-plans/src/scan/internal_file_reader.rs b/native-engine/datafusion-ext-plans/src/scan/internal_file_reader.rs index 507d2ba..c1d7c13 100644 --- a/native-engine/datafusion-ext-plans/src/scan/internal_file_reader.rs +++ b/native-engine/datafusion-ext-plans/src/scan/internal_file_reader.rs
@@ -61,6 +61,12 @@ Ok(input.clone()) } + pub fn read_fully_into_buffer(&self, range: Range<usize>, buf: &mut [u8]) -> Result<()> { + assert_eq!(range.len(), buf.len()); + self.get_input()?.read_fully(range.start as u64, buf)?; + Ok(()) + } + pub fn read_fully(&self, range: Range<usize>) -> Result<Bytes> { let mut bytes = vec![0u8; range.len()]; self.get_input()?
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/uniffle/BlazeUniffleShuffleReader.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/uniffle/BlazeUniffleShuffleReader.scala index 79ab98d..4415570 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/uniffle/BlazeUniffleShuffleReader.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/uniffle/BlazeUniffleShuffleReader.scala
@@ -89,12 +89,13 @@ FieldUtils.readField(reader, "readMetrics", true).asInstanceOf[ShuffleReadMetrics] override protected def readBlocks(): Iterator[InputStream] = { - logInfo(s"Shuffle read started: " + - s"appId=$appId" + - s", shuffleId=$shuffleId" + - s", taskId=$taskId" + - s", partitions: [$startPartition, $endPartition)" + - s", maps: [$startMapIndex, $endMapIndex)") + logInfo( + s"Shuffle read started: " + + s"appId=$appId" + + s", shuffleId=$shuffleId" + + s", taskId=$taskId" + + s", partitions: [$startPartition, $endPartition)" + + s", maps: [$startMapIndex, $endMapIndex)") val inputStream = new UniffleInputStream(
diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java index f3e324b..47fe0bc 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java
@@ -103,7 +103,10 @@ // batches in memory at the same time SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE("spark.blaze.suggested.batch.memSize.multiwayMerging", 1048576), - ORC_FORCE_POSITIONAL_EVOLUTION("spark.blaze.orc.force.positional.evolution", false); + ORC_FORCE_POSITIONAL_EVOLUTION("spark.blaze.orc.force.positional.evolution", false), + + // enable CUDA supports (via libcudf) + ENABLE_CUDA("spark.blaze.enable.cuda", false); public final String key; private final Object defaultValue;
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala index 751b1cb..adca738 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala
@@ -99,6 +99,7 @@ "output_rows" -> metric("Native.output_rows"), "output_batches" -> metric("Native.output_batches"), "elapsed_compute" -> nanoTimingMetric("Native.elapsed_compute"), + "cudf_elapsed_compute" -> nanoTimingMetric("Native.cudf_elapsed_compute"), "build_hash_map_time" -> nanoTimingMetric("Native.build_hash_map_time"), "probed_side_hash_time" -> nanoTimingMetric("Native.probed_side_hash_time"), "probed_side_search_time" -> nanoTimingMetric("Native.probed_side_search_time"), @@ -125,6 +126,9 @@ private def getDefaultNativeFileMetrics(sc: SparkContext): Map[String, SQLMetric] = { TreeMap( "bytes_scanned" -> SQLMetrics.createSizeMetric(sc, "Native.bytes_scanned"), + "cudf_elapsed_compute" -> SQLMetrics.createNanoTimingMetric( + sc, + "Native.cudf_elapsed_compute"), "io_time" -> SQLMetrics.createNanoTimingMetric(sc, "Native.io_time"), "io_time_getfs" -> SQLMetrics.createNanoTimingMetric(sc, "Native.io_time_getfs"), // Parquet metrics
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterBase.scala index 136e7b9..ea8b5d1 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterBase.scala
@@ -52,6 +52,7 @@ "stage_id", "output_rows", "elapsed_compute", + "cudf_elapsed_compute", "input_batch_count", "input_batch_mem_size", "input_row_count"))
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectBase.scala index 852fff5..1ccdc30 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectBase.scala
@@ -52,6 +52,7 @@ "stage_id", "output_rows", "elapsed_compute", + "cudf_elapsed_compute", "input_batch_count", "input_batch_mem_size", "input_row_count"))