Testing dbt project: hudi_examples_dbt

This dbt project transforms demonstrates hudi integration with dbt, it has a few models to demonstrate the different ways in which you can create hudi datasets using dbt.

This directory serves as a self-contained playground dbt project, useful for testing out scripts, and communicating some of the core dbt concepts.


Switch working directory and have python3 installed.

cd hudi-examples/hudi-examples-dbt

Install dbt

Create python virtual environment (Reference).

python3 -m venv dbt-env
source dbt-env/bin/activate

We are using thrift as the connection method (Reference).

python3 -m pip install "dbt-spark[PyHive]"

Configure dbt for Spark

Set up a profile called spark to connect to a spark cluster via thrift server (Reference).

  target: dev
      type: spark
      method: thrift
      schema: hudi_examples_dbt
      host: localhost
      port: 10000
        "spark.driver.memory": "3g"

If you have access to a data warehouse, you can use those credentials – we recommend setting your target schema to be a new schema (dbt will create the schema for you, as long as you have the right privileges). If you don't have access to an existing data warehouse, you can also setup a local postgres database and connect to it in your profile.

Start Spark Thrift server

NOTE Using these versions

  • Spark 3.2.3 (with Derby
  • Hudi 0.14.0

Start a local Derby server

wget$DERBY_VERSION/db-derby-$DERBY_VERSION-bin.tar.gz -P /opt/
tar -xf /opt/db-derby-$DERBY_VERSION-bin.tar.gz -C /opt/
export DERBY_HOME=/opt/db-derby-$DERBY_VERSION-bin
$DERBY_HOME/bin/startNetworkServer -h

Start a local Thrift server for Spark

export SPARK_VERSION=3.2.3
wget$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop2.7.tgz -P /opt/
tar -xf /opt/spark-$SPARK_VERSION-bin-hadoop2.7.tgz -C /opt/
export SPARK_HOME=/opt/spark-$SPARK_VERSION-bin-hadoop2.7

# install dependencies
cp $DERBY_HOME/lib/{derby,derbyclient}.jar $SPARK_HOME/jars/
wget -P $SPARK_HOME/jars/

# start Thrift server connecting to Derby as HMS backend
$SPARK_HOME/sbin/ \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
--conf spark.sql.warehouse.dir=/tmp/hudi/hive/warehouse \
--hiveconf hive.metastore.warehouse.dir=/tmp/hudi/hive/warehouse \
--hiveconf hive.metastore.schema.verification=false \
--hiveconf datanucleus.schema.autoCreateAll=true \
--hiveconf javax.jdo.option.ConnectionDriverName=org.apache.derby.jdbc.ClientDriver \
--hiveconf 'javax.jdo.option.ConnectionURL=jdbc:derby://localhost:1527/default;create=true'

Verify dbt setup

dbt debug

Output of the above command should show this text at the end of the output:

All checks passed!

Run the models

Run example

dbt run -m example
05:47:28  Running with dbt=1.0.0
05:47:28  Found 5 models, 10 tests, 0 snapshots, 0 analyses, 0 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
05:47:29  Concurrency: 1 threads (target='local')
05:47:29  1 of 5 START incremental model analytics.hudi_insert_table...................... [RUN]
05:47:31  1 of 5 OK created incremental model analytics.hudi_insert_table................. [OK in 2.61s]
05:47:31  2 of 5 START incremental model analytics.hudi_insert_overwrite_table............ [RUN]
05:47:34  2 of 5 OK created incremental model analytics.hudi_insert_overwrite_table....... [OK in 3.19s]
05:47:34  3 of 5 START incremental model analytics.hudi_upsert_table...................... [RUN]
05:47:37  3 of 5 OK created incremental model analytics.hudi_upsert_table................. [OK in 2.68s]
05:47:37  4 of 5 START incremental model analytics.hudi_upsert_partitioned_cow_table...... [RUN]
05:47:40  4 of 5 OK created incremental model analytics.hudi_upsert_partitioned_cow_table. [OK in 2.60s]
05:47:40  5 of 5 START incremental model analytics.hudi_upsert_partitioned_mor_table...... [RUN]
05:47:42  5 of 5 OK created incremental model analytics.hudi_upsert_partitioned_mor_table. [OK in 2.53s]
05:47:42  Finished running 5 incremental models in 14.70s.
05:47:42  Completed successfully

Test example

dbt test -m example
05:48:17  Running with dbt=1.0.0
05:48:17  Found 5 models, 10 tests, 0 snapshots, 0 analyses, 0 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
05:48:19  Concurrency: 1 threads (target='local')
05:48:19  1 of 10 START test not_null_hudi_insert_overwrite_table_id...................... [RUN]
05:48:19  1 of 10 PASS not_null_hudi_insert_overwrite_table_id............................ [PASS in 0.50s]
05:48:19  2 of 10 START test not_null_hudi_insert_overwrite_table_name.................... [RUN]
05:48:20  2 of 10 PASS not_null_hudi_insert_overwrite_table_name.......................... [PASS in 0.45s]
05:48:20  3 of 10 START test not_null_hudi_insert_overwrite_table_ts...................... [RUN]
05:48:20  3 of 10 PASS not_null_hudi_insert_overwrite_table_ts............................ [PASS in 0.47s]
05:48:20  4 of 10 START test not_null_hudi_insert_table_id................................ [RUN]
05:48:20  4 of 10 PASS not_null_hudi_insert_table_id...................................... [PASS in 0.44s]
05:48:20  5 of 10 START test not_null_hudi_upsert_table_id................................ [RUN]
05:48:21  5 of 10 PASS not_null_hudi_upsert_table_id...................................... [PASS in 0.38s]
05:48:21  6 of 10 START test not_null_hudi_upsert_table_name.............................. [RUN]
05:48:21  6 of 10 PASS not_null_hudi_upsert_table_name.................................... [PASS in 0.40s]
05:48:21  7 of 10 START test not_null_hudi_upsert_table_ts................................ [RUN]
05:48:22  7 of 10 PASS not_null_hudi_upsert_table_ts...................................... [PASS in 0.38s]
05:48:22  8 of 10 START test unique_hudi_insert_overwrite_table_id........................ [RUN]
05:48:23  8 of 10 PASS unique_hudi_insert_overwrite_table_id.............................. [PASS in 1.32s]
05:48:23  9 of 10 START test unique_hudi_insert_table_id.................................. [RUN]
05:48:24  9 of 10 PASS unique_hudi_insert_table_id........................................ [PASS in 1.26s]
05:48:24  10 of 10 START test unique_hudi_upsert_table_id................................. [RUN]
05:48:25  10 of 10 PASS unique_hudi_upsert_table_id....................................... [PASS in 1.29s]
05:48:26  Finished running 10 tests in 8.23s.
05:48:26  Completed successfully
05:48:26  Done. PASS=10 WARN=0 ERROR=0 SKIP=0 TOTAL=10

Run example_cdc

Bootstrap the raw table raw_updates and profiles.

dbt run -m example_cdc.raw_updates -m example_cdc.profiles

Launch a spark-sql shell to interact with the tables created by example_cdc.

spark-sql \
--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.14.0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.warehouse.dir=/tmp/hudi/hive/warehouse \
--conf spark.hadoop.hive.metastore.warehouse.dir=/tmp/hudi/hive/warehouse \
--conf spark.hadoop.hive.metastore.schema.verification=false \
--conf spark.hadoop.datanucleus.schema.autoCreateAll=true \
--conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.apache.derby.jdbc.ClientDriver \
--conf 'spark.hadoop.javax.jdo.option.ConnectionURL=jdbc:derby://localhost:1527/default;create=true' \
--conf 'spark.hadoop.hive.cli.print.header=true'

Insert sample records.

use hudi_examples_dbt;
insert into raw_updates values ('101', 'D', UNIX_TIMESTAMP());
insert into raw_updates values ('102', 'E', UNIX_TIMESTAMP());
insert into raw_updates values ('103', 'F', UNIX_TIMESTAMP());

Process the updates and write new date to profiles.

dbt run -m example_cdc.profiles
spark-sql> refresh table profiles;
spark-sql> select _hoodie_commit_time, user_id, city, updated_at from profiles order by updated_at;
_hoodie_commit_time	user_id	city	updated_at
20231128013722030	101	D	1701157027
20231128013722030	102	E	1701157031
20231128013722030	103	F	1701157035
Time taken: 0.219 seconds, Fetched 3 row(s)

Extract changed data from profiles to profile_changes.

dbt run -m example_cdc.profile_changes
spark-sql> refresh table profile_changes;
spark-sql> select user_id, old_city, new_city from profile_changes order by process_ts;
user_id	old_city	new_city
101	Nil	A
102	Nil	B
103	Nil	C
101	A	D
102	B	E
103	C	F
Time taken: 0.129 seconds, Fetched 6 row(s)

Generate documentation

dbt docs generate
dbt docs serve
# then visit!/overview

For more information on dbt: