Launch spark/doris/hive/hudi/minio test environments, and give examples to query hudi in Doris.
Create Network
sudo docker network create -d bridge hudi-net
Launch all components in docker
sudo ./start-hudi-compose.sh
Login into Spark
sudo ./login-spark.sh
Login into Doris
sudo ./login-doris.sh
There's already a hive table named customer in hive default. Create a hudi table from the hive table:
-- ./login-spark.sh use default; -- create a COW table CREATE TABLE customer_cow USING hudi TBLPROPERTIES ( type = 'cow', primaryKey = 'c_custkey', preCombineField = 'c_name' ) PARTITIONED BY (c_nationkey) AS SELECT * FROM customer; -- create a MOR table CREATE TABLE customer_mor USING hudi TBLPROPERTIES ( type = 'mor', primaryKey = 'c_custkey', preCombineField = 'c_name' ) PARTITIONED BY (c_nationkey) AS SELECT * FROM customer;
Doris refresh hive catalog in 10min in default, users can refresh directly to access the hudi table in Doris by doris> refresh catalog hive;
After hudi table is ready in Doris, all operations in hudi table will be detected by Doris, and there's no need to refresh catalog or tables.
Insert new data into hudi tables in spark-sql:
spark-sql> insert into customer_cow values (100, "Customer#000000100", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 25); spark-sql> insert into customer_mor values (100, "Customer#000000100", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 25);
c_nationkey=25 is a new partition, doris can query the new data at once without refresh:
doris> use hive.default; doris> select * from customer_cow where c_custkey = 100; doris> select * from customer_mor where c_custkey = 100;
Insert a record with c_custkey=32(primary key, already in table) will remove the old record:
spark-sql> insert into customer_cow values (32, "Customer#000000032_update", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 15); spark-sql> insert into customer_mor values (32, "Customer#000000032_update", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 15);
Query the updated data at once in doris:
doris> select * from customer_cow where c_custkey = 32; +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ doris> select * from customer_mor where c_custkey = 32; +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
Doris uses native reader(c++) to read the data files of the COW table, and uses the Java SDK (By calling hudi-bundle through JNI) to read the data files of the MOR table. In upsert scenario, there may still remains base files that have not been updated in the MOR table, which can be read through the native reader. Users can view the execution plan of hudi scan through the explain command, where hudiNativeReadSplits indicates how many split files are read through the native reader.
-- COW table is read natively doris> explain select * from customer_cow where c_custkey = 32; | 0:VHUDI_SCAN_NODE(68) | | table: customer_cow | | predicates: (c_custkey[#5] = 32) | | inputSplitNum=101, totalFileSize=45338886, scanRanges=101 | | partition=26/26 | | cardinality=1, numNodes=1 | | pushdown agg=NONE | | hudiNativeReadSplits=101/101 | -- MOR table: because only the base file contains `c_custkey = 32` that is updated, 100 splits are read natively, while the split with log file is read by JNI. doris> explain select * from customer_mor where c_custkey = 32; | 0:VHUDI_SCAN_NODE(68) | | table: customer_mor | | predicates: (c_custkey[#5] = 32) | | inputSplitNum=101, totalFileSize=45340731, scanRanges=101 | | partition=26/26 | | cardinality=1, numNodes=1 | | pushdown agg=NONE | | hudiNativeReadSplits=100/101 | -- Use delete statement to see more differences spark-sql> delete from customer_cow where c_custkey = 64; doris> explain select * from customer_cow where c_custkey = 64; spark-sql> delete from customer_mor where c_custkey = 64; doris> explain select * from customer_mor where c_custkey = 64; -- customer_xxx is partitioned by c_nationkey, we can use the partition column to prune data doris> explain select * from customer_mor where c_custkey = 64 and c_nationkey = 15; | 0:VHUDI_SCAN_NODE(68) | | table: customer_mor | | predicates: (c_custkey[#5] = 64), (c_nationkey[#12] = 15) | | inputSplitNum=4, totalFileSize=1798186, scanRanges=4 | | partition=1/26 | | cardinality=1, numNodes=1 | | pushdown agg=NONE | | hudiNativeReadSplits=3/4 |
See the commit metadata in spark-sql:
spark-sql> call show_commits(table => 'customer_cow', limit => 10); 20240603033556094 20240603033558249 commit 448833 0 1 1 183 0 0 20240603015444737 20240603015446588 commit 450238 0 1 1 202 1 0 20240603015018572 20240603015020503 commit 436692 1 0 1 1 0 0 20240603013858098 20240603013907467 commit 44902033 100 0 25 18751 0 0 spark-sql> call show_commits(table => 'customer_mor', limit => 10); 20240603033745977 20240603033748021 deltacommit 1240 0 1 1 0 0 0 20240603015451860 20240603015453539 deltacommit 1434 0 1 1 1 1 0 20240603015058442 20240603015100120 deltacommit 436691 1 0 1 1 0 0 20240603013918515 20240603013922961 deltacommit 44904040 100 0 25 18751 0 0
Let's travel to the commit we insert c_custkey=100 in doris where c_custkey=32 is not updated:
doris> select * from customer_cow for time as of '20240603015018572' where c_custkey = 32 or c_custkey = 100; +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ | 32 | Customer#000000032 | jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J | 25-430-914-2194 | 3471.53 | BUILDING | cial ideas. final, furious requests across the e | 15 | | 100 | Customer#000000100 | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 25 | +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ -- compare with spark-sql spark-sql> select * from customer_mor timestamp as of '20240603015018572' where c_custkey = 32 or c_custkey = 100; doris> select * from customer_mor for time as of '20240603015058442' where c_custkey = 32 or c_custkey = 100; +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ | 100 | Customer#000000100 | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 25 | | 32 | Customer#000000032 | jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J | 25-430-914-2194 | 3471.53 | BUILDING | cial ideas. final, furious requests across the e | 15 | +-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+ spark-sql> select * from customer_mor timestamp as of '20240603015058442' where c_custkey = 32 or c_custkey = 100;
Seed the data changed between after inserting c_custkey=100
doris> select * from customer_cow@incr('beginTime'='20240603015018572'); +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ spark-sql> select * from hudi_table_changes('customer_cow', 'latest_state', '20240603015018572'); doris> select * from customer_mor@incr('beginTime'='20240603015058442'); +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ | 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 | +-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+ spark-sql> select * from hudi_table_changes('customer_mor', 'latest_state', '20240603015058442');