| --- |
| title: Quickstart |
| keywords: quickstart |
| tags: [quickstart] |
| sidebar: mydoc_sidebar |
| toc: false |
| permalink: quickstart.html |
| --- |
| |
| |
| ## Download Hoodie |
| |
| Check out code and pull it into Intellij as a normal maven project. |
| |
| Normally build the maven project, from command line |
| ``` |
| $ mvn clean install -DskipTests -DskipITs |
| |
| To work with older version of Hive (pre Hive-1.2.1), use |
| |
| $ mvn clean install -DskipTests -DskipITs -Dhive11 |
| |
| ``` |
| |
| {% include callout.html content="You might want to add your spark jars folder to project dependencies under 'Module Setttings', to be able to run Spark from IDE" type="info" %} |
| |
| {% include note.html content="Setup your local hadoop/hive test environment, so you can play with entire ecosystem. See [this](http://www.bytearray.io/2016/05/setting-up-hadoopyarnsparkhive-on-mac.html) for reference" %} |
| |
| |
| ## Version Compatibility |
| |
| Hoodie requires Java 8 to be installed. Hoodie works with Spark-2.x versions. We have verified that hoodie works with the following combination of Hadoop/Hive/Spark. |
| |
| | Hadoop | Hive | Spark | Instructions to Build Hoodie | |
| | ---- | ----- | ---- | ---- | |
| | 2.6.0-cdh5.7.2 | 1.1.0-cdh5.7.2 | spark-2.[1-3].x | Use "mvn clean install -DskipTests -Dhive11". Jars will have ".hive11" as suffix | |
| | Apache hadoop-2.8.4 | Apache hive-2.3.3 | spark-2.[1-3].x | Use "mvn clean install -DskipTests" | |
| | Apache hadoop-2.7.3 | Apache hive-1.2.1 | spark-2.[1-3].x | Use "mvn clean install -DskipTests" | |
| |
| If your environment has other versions of hadoop/hive/spark, please try out hoodie and let us know if there are any issues. We are limited by our bandwidth to certify other combinations. |
| It would be of great help if you can reach out to us with your setup and experience with hoodie. |
| |
| ## Generate a Hoodie Dataset |
| |
| ### Requirements & Environment Variable |
| |
| Please set the following environment variablies according to your setup. We have given an example setup with CDH version |
| |
| ``` |
| export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre/ |
| export HIVE_HOME=/var/hadoop/setup/apache-hive-1.1.0-cdh5.7.2-bin |
| export HADOOP_HOME=/var/hadoop/setup/hadoop-2.6.0-cdh5.7.2 |
| export HADOOP_INSTALL=/var/hadoop/setup/hadoop-2.6.0-cdh5.7.2 |
| export HADOOP_CONF_DIR=$HADOOP_INSTALL/etc/hadoop |
| export SPARK_HOME=/var/hadoop/setup/spark-2.3.1-bin-hadoop2.7 |
| export SPARK_INSTALL=$SPARK_HOME |
| export SPARK_CONF_DIR=$SPARK_HOME/conf |
| export PATH=$JAVA_HOME/bin:$HIVE_HOME/bin:$HADOOP_HOME/bin:$SPARK_INSTALL/bin:$PATH |
| ``` |
| |
| ### Supported API's |
| |
| Use the DataSource API to quickly start reading or writing hoodie datasets in few lines of code. Ideal for most |
| ingestion use-cases. |
| Use the RDD API to perform more involved actions on a hoodie dataset |
| |
| #### DataSource API |
| |
| Run __hoodie-spark/src/test/java/HoodieJavaApp.java__ class, to place a two commits (commit 1 => 100 inserts, commit 2 => 100 updates to previously inserted 100 records) onto your HDFS/local filesystem. Use the wrapper script |
| to run from command-line |
| |
| ``` |
| cd hoodie-spark |
| ./run_hoodie_app.sh --help |
| Usage: <main class> [options] |
| Options: |
| --help, -h |
| Default: false |
| --table-name, -n |
| table name for Hoodie sample table |
| Default: hoodie_rt |
| --table-path, -p |
| path for Hoodie sample table |
| Default: file:///tmp/hoodie/sample-table |
| --table-type, -t |
| One of COPY_ON_WRITE or MERGE_ON_READ |
| Default: COPY_ON_WRITE |
| |
| |
| ``` |
| |
| The class lets you choose table names, output paths and one of the storage types. In your own applications, be sure to include the `hoodie-spark` module as dependency |
| and follow a similar pattern to write/read datasets via the datasource. |
| |
| #### RDD API |
| |
| RDD level APIs give you more power and control over things, via the `hoodie-client` module . |
| Refer to __hoodie-client/src/test/java/HoodieClientExample.java__ class for an example. |
| |
| |
| |
| ## Query a Hoodie dataset |
| |
| ### Register Dataset to Hive Metastore |
| |
| Now, lets see how we can publish this data into Hive. |
| |
| #### Starting up Hive locally |
| |
| ``` |
| hdfs namenode # start name node |
| hdfs datanode # start data node |
| |
| bin/hive --service metastore # start metastore |
| bin/hiveserver2 \ |
| --hiveconf hive.root.logger=INFO,console \ |
| --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \ |
| --hiveconf ive.stats.autogather=false \ |
| --hiveconf hive.aux.jars.path=hoodie/packaging/hoodie-hadoop-mr-bundle/target/hoodie-hadoop-mr-bundle-0.4.3-SNAPSHOT.jar |
| |
| ``` |
| |
| |
| #### Hive Sync Tool |
| |
| Hive Sync Tool will update/create the necessary metadata(schema and partitions) in hive metastore. |
| This allows for schema evolution and incremental addition of new partitions written to. |
| It uses an incremental approach by storing the last commit time synced in the TBLPROPERTIES and only syncing the commits from the last sync commit time stored. |
| This can be run as frequently as the ingestion pipeline to make sure new partitions and schema evolution changes are reflected immediately. |
| |
| ``` |
| cd hoodie-hive |
| ./run_sync_tool.sh |
| --user hive |
| --pass hive |
| --database default |
| --jdbc-url "jdbc:hive2://localhost:10010/" |
| --base-path tmp/hoodie/sample-table/ |
| --table hoodie_test |
| --partitioned-by field1,field2 |
| |
| ``` |
| |
| |
| |
| #### Manually via Beeline |
| Add in the hoodie-hadoop-mr-bundler jar so, Hive can read the Hoodie dataset and answer the query. |
| Also, For reading hoodie tables using hive, the following configs needs to be setup |
| |
| ``` |
| hive> set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; |
| hive> set hive.stats.autogather=false; |
| hive> add jar file:///tmp/hoodie-hadoop-mr-bundle-0.4.3.jar; |
| Added [file:///tmp/hoodie-hadoop-mr-bundle-0.4.3.jar] to class path |
| Added resources: [file:///tmp/hoodie-hadoop-mr-bundle-0.4.3.jar] |
| ``` |
| |
| Then, you need to create a __ReadOptimized__ Hive table as below (only type supported as of now)and register the sample partitions |
| |
| ``` |
| drop table hoodie_test; |
| CREATE EXTERNAL TABLE hoodie_test(`_row_key` string, |
| `_hoodie_commit_time` string, |
| `_hoodie_commit_seqno` string, |
| rider string, |
| driver string, |
| begin_lat double, |
| begin_lon double, |
| end_lat double, |
| end_lon double, |
| fare double) |
| PARTITIONED BY (`datestr` string) |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
| STORED AS INPUTFORMAT |
| 'com.uber.hoodie.hadoop.HoodieInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION |
| 'hdfs:///tmp/hoodie/sample-table'; |
| |
| ALTER TABLE `hoodie_test` ADD IF NOT EXISTS PARTITION (datestr='2016-03-15') LOCATION 'hdfs:///tmp/hoodie/sample-table/2016/03/15'; |
| ALTER TABLE `hoodie_test` ADD IF NOT EXISTS PARTITION (datestr='2015-03-16') LOCATION 'hdfs:///tmp/hoodie/sample-table/2015/03/16'; |
| ALTER TABLE `hoodie_test` ADD IF NOT EXISTS PARTITION (datestr='2015-03-17') LOCATION 'hdfs:///tmp/hoodie/sample-table/2015/03/17'; |
| |
| set mapreduce.framework.name=yarn; |
| ``` |
| |
| And you can generate a __Realtime__ Hive table, as below |
| |
| ``` |
| DROP TABLE hoodie_rt; |
| CREATE EXTERNAL TABLE hoodie_rt( |
| `_hoodie_commit_time` string, |
| `_hoodie_commit_seqno` string, |
| `_hoodie_record_key` string, |
| `_hoodie_partition_path` string, |
| `_hoodie_file_name` string, |
| timestamp double, |
| `_row_key` string, |
| rider string, |
| driver string, |
| begin_lat double, |
| begin_lon double, |
| end_lat double, |
| end_lon double, |
| fare double) |
| PARTITIONED BY (`datestr` string) |
| ROW FORMAT SERDE |
| 'com.uber.hoodie.hadoop.realtime.HoodieParquetSerde' |
| STORED AS INPUTFORMAT |
| 'com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION |
| 'file:///tmp/hoodie/sample-table'; |
| |
| ALTER TABLE `hoodie_rt` ADD IF NOT EXISTS PARTITION (datestr='2016-03-15') LOCATION 'file:///tmp/hoodie/sample-table/2016/03/15'; |
| ALTER TABLE `hoodie_rt` ADD IF NOT EXISTS PARTITION (datestr='2015-03-16') LOCATION 'file:///tmp/hoodie/sample-table/2015/03/16'; |
| ALTER TABLE `hoodie_rt` ADD IF NOT EXISTS PARTITION (datestr='2015-03-17') LOCATION 'file:///tmp/hoodie/sample-table/2015/03/17'; |
| |
| ``` |
| |
| |
| |
| ### Using different query engines |
| |
| Now, we can proceed to query the dataset, as we would normally do across all the three query engines supported. |
| |
| #### HiveQL |
| |
| Let's first perform a query on the latest committed snapshot of the table |
| |
| ``` |
| hive> select count(*) from hoodie_test; |
| ... |
| OK |
| 100 |
| Time taken: 18.05 seconds, Fetched: 1 row(s) |
| hive> |
| ``` |
| |
| #### SparkSQL |
| |
| Spark is super easy, once you get Hive working as above. Just spin up a Spark Shell as below |
| |
| ``` |
| $ cd $SPARK_INSTALL |
| $ spark-shell --jars $HUDI_SRC/packaging/hoodie-spark-bundle/target/hoodie-spark-bundle-0.4.3-SNAPSHOT.jar --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --packages com.databricks:spark-avro_2.11:4.0.0 |
| |
| scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) |
| scala> sqlContext.sql("show tables").show(10000) |
| scala> sqlContext.sql("describe hoodie_test").show(10000) |
| scala> sqlContext.sql("describe hoodie_rt").show(10000) |
| scala> sqlContext.sql("select count(*) from hoodie_test").show(10000) |
| ``` |
| |
| You can also use the sample queries in __hoodie-utilities/src/test/java/HoodieSparkSQLExample.java__ for running on `hoodie_rt` |
| |
| #### Presto |
| |
| Checkout the 'master' branch on OSS Presto, build it, and place your installation somewhere. |
| |
| * Copy the hoodie-hadoop-mr-* jar into $PRESTO_INSTALL/plugin/hive-hadoop2/ |
| * Startup your server and you should be able to query the same Hive table via Presto |
| |
| ``` |
| show columns from hive.default.hoodie_test; |
| select count(*) from hive.default.hoodie_test |
| ``` |
| |
| |
| |
| ## Incremental Queries of a Hoodie dataset |
| |
| Let's now perform a query, to obtain the __ONLY__ changed rows since a commit in the past. |
| |
| ``` |
| hive> set hoodie.hoodie_test.consume.mode=INCREMENTAL; |
| hive> set hoodie.hoodie_test.consume.start.timestamp=001; |
| hive> set hoodie.hoodie_test.consume.max.commits=10; |
| hive> select `_hoodie_commit_time`, rider, driver from hoodie_test where `_hoodie_commit_time` > '001' limit 10; |
| OK |
| All commits :[001, 002] |
| 002 rider-001 driver-001 |
| 002 rider-001 driver-001 |
| 002 rider-002 driver-002 |
| 002 rider-001 driver-001 |
| 002 rider-001 driver-001 |
| 002 rider-002 driver-002 |
| 002 rider-001 driver-001 |
| 002 rider-002 driver-002 |
| 002 rider-002 driver-002 |
| 002 rider-001 driver-001 |
| Time taken: 0.056 seconds, Fetched: 10 row(s) |
| hive> |
| hive> |
| ``` |
| |
| |
| {% include note.html content="This is only supported for Read-optimized tables for now." %} |
| |
| |
| ## A Demo using docker containers |
| |
| Lets use a real world example to see how hudi works end to end. For this purpose, a self contained |
| data infrastructure is brought up in a local docker cluster within your computer. |
| |
| The steps assume you are using Mac laptop |
| |
| ### Prerequisites |
| |
| * Docker Setup : For Mac, Please follow the steps as defined in [https://docs.docker.com/v17.12/docker-for-mac/install/]. For running Spark-SQL queries, please ensure atleast 6 GB and 4 CPUs are allocated to Docker (See Docker -> Preferences -> Advanced). Otherwise, spark-SQL queries could be killed because of memory issues. |
| * kafkacat : A command-line utility to publish/consume from kafka topics. Use `brew install kafkacat` to install kafkacat |
| * /etc/hosts : The demo references many services running in container by the hostname. Add the following settings to /etc/hosts |
| |
| ``` |
| 127.0.0.1 adhoc-1 |
| 127.0.0.1 adhoc-2 |
| 127.0.0.1 namenode |
| 127.0.0.1 datanode1 |
| 127.0.0.1 hiveserver |
| 127.0.0.1 hivemetastore |
| 127.0.0.1 kafkabroker |
| 127.0.0.1 sparkmaster |
| 127.0.0.1 zookeeper |
| ``` |
| |
| ### Setting up Docker Cluster |
| |
| |
| #### Build Hoodie |
| |
| The first step is to build hoodie |
| ``` |
| cd <HUDI_WORKSPACE> |
| mvn package -DskipTests |
| ``` |
| |
| #### Bringing up Demo Cluster |
| |
| The next step is to run the docker compose script and setup configs for bringing up the cluster. |
| This should pull the docker images from docker hub and setup docker cluster. |
| |
| ``` |
| cd docker |
| ./setup_demo.sh |
| .... |
| .... |
| .... |
| Stopping spark-worker-1 ... done |
| Stopping hiveserver ... done |
| Stopping hivemetastore ... done |
| Stopping historyserver ... done |
| ....... |
| ...... |
| Creating network "hudi_demo" with the default driver |
| Creating hive-metastore-postgresql ... done |
| Creating namenode ... done |
| Creating zookeeper ... done |
| Creating kafkabroker ... done |
| Creating hivemetastore ... done |
| Creating historyserver ... done |
| Creating hiveserver ... done |
| Creating datanode1 ... done |
| Creating sparkmaster ... done |
| Creating adhoc-1 ... done |
| Creating adhoc-2 ... done |
| Creating spark-worker-1 ... done |
| Copying spark default config and setting up configs |
| Copying spark default config and setting up configs |
| Copying spark default config and setting up configs |
| varadarb-C02SG7Q3G8WP:docker varadarb$ docker ps |
| ``` |
| |
| At this point, the docker cluster will be up and running. The demo cluster brings up the following services |
| |
| * HDFS Services (NameNode, DataNode) |
| * Spark Master and Worker |
| * Hive Services (Metastore, HiveServer2 along with PostgresDB) |
| * Kafka Broker and a Zookeeper Node (Kakfa will be used as upstream source for the demo) |
| * Adhoc containers to run Hudi/Hive CLI commands |
| |
| ### Demo |
| |
| Stock Tracker data will be used to showcase both different Hudi Views and the effects of Compaction. |
| |
| Take a look at the directory `docker/demo/data`. There are 2 batches of stock data - each at 1 minute granularity. |
| The first batch contains stocker tracker data for some stock symbols during the first hour of trading window |
| (9:30 a.m to 10:30 a.m). The second batch contains tracker data for next 30 mins (10:30 - 11 a.m). Hudi will |
| be used to ingest these batches to a dataset which will contain the latest stock tracker data at hour level granularity. |
| The batches are windowed intentionally so that the second batch contains updates to some of the rows in the first batch. |
| |
| #### Step 1 : Publish the first batch to Kafka |
| |
| Upload the first batch to Kafka topic 'stock ticks' |
| |
| ``` |
| cat docker/demo/data/batch_1.json | kafkacat -b kafkabroker -t stock_ticks -P |
| |
| To check if the new topic shows up, use |
| kafkacat -b kafkabroker -L -J | jq . |
| { |
| "originating_broker": { |
| "id": 1001, |
| "name": "kafkabroker:9092/1001" |
| }, |
| "query": { |
| "topic": "*" |
| }, |
| "brokers": [ |
| { |
| "id": 1001, |
| "name": "kafkabroker:9092" |
| } |
| ], |
| "topics": [ |
| { |
| "topic": "stock_ticks", |
| "partitions": [ |
| { |
| "partition": 0, |
| "leader": 1001, |
| "replicas": [ |
| { |
| "id": 1001 |
| } |
| ], |
| "isrs": [ |
| { |
| "id": 1001 |
| } |
| ] |
| } |
| ] |
| } |
| ] |
| } |
| |
| ``` |
| |
| #### Step 2: Incrementally ingest data from Kafka topic |
| |
| Hudi comes with a tool named DeltaStreamer. This tool can connect to variety of data sources (including Kafka) to |
| pull changes and apply to Hudi dataset using upsert/insert primitives. Here, we will use the tool to download |
| json data from kafka topic and ingest to both COW and MOR tables we initialized in the previous step. This tool |
| automatically initializes the datasets in the file-system if they do not exist yet. |
| |
| ``` |
| docker exec -it adhoc-2 /bin/bash |
| |
| # Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow dataset in HDFS |
| spark-submit --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type COPY_ON_WRITE --source-class com.uber.hoodie.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties |
| .... |
| .... |
| 2018-09-24 22:20:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped! |
| 2018-09-24 22:20:00 INFO SparkContext:54 - Successfully stopped SparkContext |
| # Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_mor dataset in HDFS |
| spark-submit --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type MERGE_ON_READ --source-class com.uber.hoodie.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /var/demo/config/kafka-source.properties |
| .... |
| 2018-09-24 22:22:01 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped! |
| 2018-09-24 22:22:01 INFO SparkContext:54 - Successfully stopped SparkContext |
| .... |
| |
| # As part of the setup (Look at setup_demo.sh), the configs needed for DeltaStreamer is uploaded to HDFS. The configs |
| # contain mostly Kafa connectivity settings, the avro-schema to be used for ingesting along with key and partitioning fields. |
| |
| exit |
| ``` |
| |
| You can use HDFS web-browser to look at the datasets |
| `http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_cow`. |
| |
| You can explore the new partition folder created in the dataset along with a "deltacommit" |
| file under .hoodie which signals a successful commit. |
| |
| There will be a similar setup when you browse the MOR dataset |
| `http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_mor` |
| |
| |
| #### Step 3: Sync with Hive |
| |
| At this step, the datasets are available in HDFS. We need to sync with Hive to create new Hive tables and add partitions |
| inorder to run Hive queries against those datasets. |
| |
| ``` |
| docker exec -it adhoc-2 /bin/bash |
| |
| # THis command takes in HIveServer URL and COW Hudi Dataset location in HDFS and sync the HDFS state to Hive |
| /var/hoodie/ws/hoodie-hive/run_sync_tool.sh --jdbc-url jdbc:hive2://hiveserver:10000 --user hive --pass hive --partitioned-by dt --base-path /user/hive/warehouse/stock_ticks_cow --database default --table stock_ticks_cow |
| ..... |
| 2018-09-24 22:22:45,568 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_cow |
| ..... |
| |
| # Now run hive-sync for the second data-set in HDFS using Merge-On-Read (MOR storage) |
| /var/hoodie/ws/hoodie-hive/run_sync_tool.sh --jdbc-url jdbc:hive2://hiveserver:10000 --user hive --pass hive --partitioned-by dt --base-path /user/hive/warehouse/stock_ticks_mor --database default --table stock_ticks_mor |
| ... |
| 2018-09-24 22:23:09,171 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_mor |
| ... |
| 2018-09-24 22:23:09,559 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_mor_rt |
| .... |
| exit |
| ``` |
| After executing the above command, you will notice |
| |
| 1. A hive table named `stock_ticks_cow` created which provides Read-Optimized view for the Copy On Write dataset. |
| 2. Two new tables `stock_ticks_mor` and `stock_ticks_mor_rt` created for the Merge On Read dataset. The former |
| provides the ReadOptimized view for the Hudi dataset and the later provides the realtime-view for the dataset. |
| |
| |
| #### Step 4 (a): Run Hive Queries |
| |
| Run a hive query to find the latest timestamp ingested for stock symbol 'GOOG'. You will notice that both read-optimized |
| (for both COW and MOR dataset)and realtime views (for MOR dataset)give the same value "10:29 a.m" as Hudi create a |
| parquet file for the first batch of data. |
| |
| ``` |
| docker exec -it adhoc-2 /bin/bash |
| beeline -u jdbc:hive2://hiveserver:10000 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false |
| # List Tables |
| 0: jdbc:hive2://hiveserver:10000> show tables; |
| +---------------------+--+ |
| | tab_name | |
| +---------------------+--+ |
| | stock_ticks_cow | |
| | stock_ticks_mor | |
| | stock_ticks_mor_rt | |
| +---------------------+--+ |
| 2 rows selected (0.801 seconds) |
| 0: jdbc:hive2://hiveserver:10000> |
| |
| |
| # Look at partitions that were added |
| 0: jdbc:hive2://hiveserver:10000> show partitions stock_ticks_mor_rt; |
| +----------------+--+ |
| | partition | |
| +----------------+--+ |
| | dt=2018-08-31 | |
| +----------------+--+ |
| 1 row selected (0.24 seconds) |
| |
| |
| # COPY-ON-WRITE Queries: |
| ========================= |
| |
| |
| 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'; |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:29:00 | |
| +---------+----------------------+--+ |
| |
| Now, run a projection query: |
| |
| 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'; |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924221953 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924221953 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| |
| # Merge-On-Read Queries: |
| ========================== |
| |
| Lets run similar queries against M-O-R dataset. Lets look at both |
| ReadOptimized and Realtime views supported by M-O-R dataset |
| |
| # Run against ReadOptimized View. Notice that the latest timestamp is 10:29 |
| 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'; |
| WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:29:00 | |
| +---------+----------------------+--+ |
| 1 row selected (6.326 seconds) |
| |
| |
| # Run against Realtime View. Notice that the latest timestamp is again 10:29 |
| |
| 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'; |
| WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:29:00 | |
| +---------+----------------------+--+ |
| 1 row selected (1.606 seconds) |
| |
| |
| # Run projection query against Read Optimized and Realtime tables |
| |
| 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'; |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'; |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| exit |
| exit |
| ``` |
| |
| #### Step 4 (b): Run Spark-SQL Queries |
| Hudi support Spark as query processor just like Hive. Here are the same hive queries |
| running in spark-sql |
| |
| ``` |
| docker exec -it adhoc-1 /bin/bash |
| $SPARK_INSTALL/bin/spark-shell --jars $HUDI_SPARK_BUNDLE --master local[2] --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --executor-memory 3G --num-executors 1 --packages com.databricks:spark-avro_2.11:4.0.0 |
| ... |
| |
| Welcome to |
| ____ __ |
| / __/__ ___ _____/ /__ |
| _\ \/ _ \/ _ `/ __/ '_/ |
| /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 |
| /_/ |
| |
| Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) |
| Type in expressions to have them evaluated. |
| Type :help for more information. |
| |
| scala> |
| scala> spark.sql("show tables").show(100, false) |
| +--------+------------------+-----------+ |
| |database|tableName |isTemporary| |
| +--------+------------------+-----------+ |
| |default |stock_ticks_cow |false | |
| |default |stock_ticks_mor |false | |
| |default |stock_ticks_mor_rt|false | |
| +--------+------------------+-----------+ |
| |
| # Copy-On-Write Table |
| |
| ## Run max timestamp query against COW table |
| |
| scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false) |
| [Stage 0:> (0 + 1) / 1]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". |
| SLF4J: Defaulting to no-operation (NOP) logger implementation |
| SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. |
| +------+-------------------+ |
| |symbol|max(ts) | |
| +------+-------------------+ |
| |GOOG |2018-08-31 10:29:00| |
| +------+-------------------+ |
| |
| ## Projection Query |
| |
| scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'").show(100, false) |
| +-------------------+------+-------------------+------+---------+--------+ |
| |_hoodie_commit_time|symbol|ts |volume|open |close | |
| +-------------------+------+-------------------+------+---------+--------+ |
| |20180924221953 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |
| |20180924221953 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| |
| +-------------------+------+-------------------+------+---------+--------+ |
| |
| # Merge-On-Read Queries: |
| ========================== |
| |
| Lets run similar queries against M-O-R dataset. Lets look at both |
| ReadOptimized and Realtime views supported by M-O-R dataset |
| |
| # Run against ReadOptimized View. Notice that the latest timestamp is 10:29 |
| scala> spark.sql("select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'").show(100, false) |
| +------+-------------------+ |
| |symbol|max(ts) | |
| +------+-------------------+ |
| |GOOG |2018-08-31 10:29:00| |
| +------+-------------------+ |
| |
| |
| # Run against Realtime View. Notice that the latest timestamp is again 10:29 |
| |
| scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false) |
| +------+-------------------+ |
| |symbol|max(ts) | |
| +------+-------------------+ |
| |GOOG |2018-08-31 10:29:00| |
| +------+-------------------+ |
| |
| # Run projection query against Read Optimized and Realtime tables |
| |
| scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'").show(100, false) |
| +-------------------+------+-------------------+------+---------+--------+ |
| |_hoodie_commit_time|symbol|ts |volume|open |close | |
| +-------------------+------+-------------------+------+---------+--------+ |
| |20180924222155 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |
| |20180924222155 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| |
| +-------------------+------+-------------------+------+---------+--------+ |
| |
| scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false) |
| +-------------------+------+-------------------+------+---------+--------+ |
| |_hoodie_commit_time|symbol|ts |volume|open |close | |
| +-------------------+------+-------------------+------+---------+--------+ |
| |20180924222155 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |
| |20180924222155 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| |
| +-------------------+------+-------------------+------+---------+--------+ |
| |
| ``` |
| |
| |
| #### Step 5: Upload second batch to Kafka and run DeltaStreamer to ingest |
| |
| Upload the second batch of data and ingest this batch using delta-streamer. As this batch does not bring in any new |
| partitions, there is no need to run hive-sync |
| |
| ``` |
| cat docker/demo/data/batch_2.json | kafkacat -b kafkabroker -t stock_ticks -P |
| |
| # Within Docker container, run the ingestion command |
| docker exec -it adhoc-2 /bin/bash |
| |
| # Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow dataset in HDFS |
| spark-submit --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type COPY_ON_WRITE --source-class com.uber.hoodie.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties |
| |
| # Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_mor dataset in HDFS |
| spark-submit --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type MERGE_ON_READ --source-class com.uber.hoodie.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /var/demo/config/kafka-source.properties |
| |
| exit |
| ``` |
| With Copy-On-Write table, the second ingestion by DeltaStreamer resulted in a new version of Parquet file getting created. |
| See `http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_cow/2018/08/31` |
| |
| With Merge-On-Read table, the second ingestion merely appended the batch to an unmerged delta (log) file. |
| Take a look at the HDFS filesystem to get an idea: `http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_mor/2018/08/31` |
| |
| #### Step 6(a): Run Hive Queries |
| |
| With Copy-On-Write table, the read-optimized view immediately sees the changes as part of second batch once the batch |
| got committed as each ingestion creates newer versions of parquet files. |
| |
| With Merge-On-Read table, the second ingestion merely appended the batch to an unmerged delta (log) file. |
| This is the time, when ReadOptimized and Realtime views will provide different results. ReadOptimized view will still |
| return "10:29 am" as it will only read from the Parquet file. Realtime View will do on-the-fly merge and return |
| latest committed data which is "10:59 a.m". |
| |
| ``` |
| docker exec -it adhoc-2 /bin/bash |
| beeline -u jdbc:hive2://hiveserver:10000 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false |
| |
| # Copy On Write Table: |
| |
| 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'; |
| WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:59:00 | |
| +---------+----------------------+--+ |
| 1 row selected (1.932 seconds) |
| |
| 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'; |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924221953 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924224524 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| As you can notice, the above queries now reflect the changes that came as part of ingesting second batch. |
| |
| |
| # Merge On Read Table: |
| |
| # Read Optimized View |
| 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'; |
| WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:29:00 | |
| +---------+----------------------+--+ |
| 1 row selected (1.6 seconds) |
| |
| 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'; |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| # Realtime View |
| 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'; |
| WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:59:00 | |
| +---------+----------------------+--+ |
| |
| 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'; |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924224537 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| exit |
| exit |
| ``` |
| |
| #### Step 6(b): Run Spark SQL Queries |
| |
| Running the same queries in Spark-SQL: |
| |
| ``` |
| docker exec -it adhoc-1 /bin/bash |
| bash-4.4# $SPARK_INSTALL/bin/spark-shell --jars $HUDI_SPARK_BUNDLE --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --master local[2] --executor-memory 3G --num-executors 1 --packages com.databricks:spark-avro_2.11:4.0.0 |
| |
| # Copy On Write Table: |
| |
| scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false) |
| +------+-------------------+ |
| |symbol|max(ts) | |
| +------+-------------------+ |
| |GOOG |2018-08-31 10:59:00| |
| +------+-------------------+ |
| |
| scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'").show(100, false) |
| |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924221953 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924224524 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| As you can notice, the above queries now reflect the changes that came as part of ingesting second batch. |
| |
| |
| # Merge On Read Table: |
| |
| # Read Optimized View |
| scala> spark.sql("select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'").show(100, false) |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:29:00 | |
| +---------+----------------------+--+ |
| 1 row selected (1.6 seconds) |
| |
| scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'").show(100, false) |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| # Realtime View |
| scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false) |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:59:00 | |
| +---------+----------------------+--+ |
| |
| scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false) |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924224537 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| exit |
| exit |
| ``` |
| |
| #### Step 7 : Incremental Query for COPY-ON-WRITE Table |
| |
| With 2 batches of data ingested, lets showcase the support for incremental queries in Hudi Copy-On-Write datasets |
| |
| Lets take the same projection query example |
| ``` |
| docker exec -it adhoc-2 /bin/bash |
| beeline -u jdbc:hive2://hiveserver:10000 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false |
| |
| 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'; |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924064621 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924065039 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| ``` |
| |
| As you notice from the above queries, there are 2 commits - 20180924064621 and 20180924065039 in timeline order. |
| When you follow the steps, you will be getting different timestamps for commits. Substitute them |
| in place of the above timestamps. |
| |
| To show the effects of incremental-query, let us assume that a reader has already seen the changes as part of |
| ingesting first batch. Now, for the reader to see effect of the second batch, he/she has to keep the start timestamp to |
| the commit time of the first batch (20180924064621) and run incremental query |
| |
| `Hudi incremental mode` provides efficient scanning for incremental queries by filtering out files that do not have any |
| candidate rows using hudi-managed metadata. |
| |
| ``` |
| docker exec -it adhoc-2 /bin/bash |
| beeline -u jdbc:hive2://hiveserver:10000 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false |
| 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_cow.consume.mode=INCREMENTAL; |
| No rows affected (0.009 seconds) |
| 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_cow.consume.max.commits=3; |
| No rows affected (0.009 seconds) |
| 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_cow.consume.start.timestamp=20180924064621; |
| |
| # With the above setting, file-ids that do not have any updates from the commit 20180924065039 is filtered out without scanning. |
| # Here is the incremental query : |
| |
| 0: jdbc:hive2://hiveserver:10000> |
| 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG' and `_hoodie_commit_time` > '20180924064621'; |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924065039 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| 1 row selected (0.83 seconds) |
| 0: jdbc:hive2://hiveserver:10000> |
| ``` |
| |
| ##### Incremental Query with Spark SQL: |
| ``` |
| docker exec -it adhoc-1 /bin/bash |
| bash-4.4# $SPARK_INSTALL/bin/spark-shell --jars $HUDI_SPARK_BUNDLE --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --master local[2] --executor-memory 3G --num-executors 1 --packages com.databricks:spark-avro_2.11:4.0.0 |
| Welcome to |
| ____ __ |
| / __/__ ___ _____/ /__ |
| _\ \/ _ \/ _ `/ __/ '_/ |
| /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 |
| /_/ |
| |
| Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) |
| Type in expressions to have them evaluated. |
| Type :help for more information. |
| |
| scala> import com.uber.hoodie.DataSourceReadOptions |
| import com.uber.hoodie.DataSourceReadOptions |
| |
| # In the below query, 20180925045257 is the first commit's timestamp |
| scala> val hoodieIncViewDF = spark.read.format("com.uber.hoodie").option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20180924064621").load("/user/hive/warehouse/stock_ticks_cow") |
| SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". |
| SLF4J: Defaulting to no-operation (NOP) logger implementation |
| SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. |
| hoodieIncViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 15 more fields] |
| |
| scala> hoodieIncViewDF.registerTempTable("stock_ticks_cow_incr_tmp1") |
| warning: there was one deprecation warning; re-run with -deprecation for details |
| |
| scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr_tmp1 where symbol = 'GOOG'").show(100, false); |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924065039 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| ``` |
| |
| |
| #### Step 8: Schedule and Run Compaction for Merge-On-Read dataset |
| |
| Lets schedule and run a compaction to create a new version of columnar file so that read-optimized readers will see fresher data. |
| Again, You can use Hudi CLI to manually schedule and run compaction |
| |
| ``` |
| docker exec -it adhoc-1 /bin/bash |
| ^[[Aroot@adhoc-1:/opt# /var/hoodie/ws/hoodie-cli/hoodie-cli.sh |
| ============================================ |
| * * |
| * _ _ _ _ * |
| * | | | | | (_) * |
| * | |__| | ___ ___ __| |_ ___ * |
| * | __ |/ _ \ / _ \ / _` | |/ _ \ * |
| * | | | | (_) | (_) | (_| | | __/ * |
| * |_| |_|\___/ \___/ \__,_|_|\___| * |
| * * |
| ============================================ |
| |
| Welcome to Hoodie CLI. Please type help if you are looking for help. |
| hoodie->connect --path /user/hive/warehouse/stock_ticks_mor |
| 18/09/24 06:59:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable |
| 18/09/24 06:59:35 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from /user/hive/warehouse/stock_ticks_mor |
| 18/09/24 06:59:35 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://namenode:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1261652683_11, ugi=root (auth:SIMPLE)]]] |
| 18/09/24 06:59:35 INFO table.HoodieTableConfig: Loading dataset properties from /user/hive/warehouse/stock_ticks_mor/.hoodie/hoodie.properties |
| 18/09/24 06:59:36 INFO table.HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ from /user/hive/warehouse/stock_ticks_mor |
| Metadata for table stock_ticks_mor loaded |
| |
| # Ensure no compactions are present |
| |
| hoodie:stock_ticks_mor->compactions show all |
| 18/09/24 06:59:54 INFO timeline.HoodieActiveTimeline: Loaded instants [[20180924064636__clean__COMPLETED], [20180924064636__deltacommit__COMPLETED], [20180924065057__clean__COMPLETED], [20180924065057__deltacommit__COMPLETED]] |
| ___________________________________________________________________ |
| | Compaction Instant Time| State | Total FileIds to be Compacted| |
| |==================================================================| |
| |
| # Schedule a compaction. This will use Spark Launcher to schedule compaction |
| hoodie:stock_ticks_mor->compaction schedule |
| .... |
| Compaction successfully completed for 20180924070031 |
| |
| # Now refresh and check again. You will see that there is a new compaction requested |
| |
| hoodie:stock_ticks->connect --path /user/hive/warehouse/stock_ticks_mor |
| 18/09/24 07:01:16 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from /user/hive/warehouse/stock_ticks_mor |
| 18/09/24 07:01:16 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://namenode:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1261652683_11, ugi=root (auth:SIMPLE)]]] |
| 18/09/24 07:01:16 INFO table.HoodieTableConfig: Loading dataset properties from /user/hive/warehouse/stock_ticks_mor/.hoodie/hoodie.properties |
| 18/09/24 07:01:16 INFO table.HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ from /user/hive/warehouse/stock_ticks_mor |
| Metadata for table stock_ticks_mor loaded |
| |
| hoodie:stock_ticks_mor->compactions show all |
| 18/09/24 06:34:12 INFO timeline.HoodieActiveTimeline: Loaded instants [[20180924041125__clean__COMPLETED], [20180924041125__deltacommit__COMPLETED], [20180924042735__clean__COMPLETED], [20180924042735__deltacommit__COMPLETED], [==>20180924063245__compaction__REQUESTED]] |
| ___________________________________________________________________ |
| | Compaction Instant Time| State | Total FileIds to be Compacted| |
| |==================================================================| |
| | 20180924070031 | REQUESTED| 1 | |
| |
| # Execute the compaction. The compaction instant value passed below must be the one displayed in the above "compactions show all" query |
| hoodie:stock_ticks_mor->compaction run --compactionInstant 20180924070031 --parallelism 2 --sparkMemory 1G --schemaFilePath /var/demo/config/schema.avsc --retry 1 |
| .... |
| Compaction successfully completed for 20180924070031 |
| |
| |
| ## Now check if compaction is completed |
| |
| hoodie:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_mor |
| 18/09/24 07:03:00 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from /user/hive/warehouse/stock_ticks_mor |
| 18/09/24 07:03:00 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://namenode:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1261652683_11, ugi=root (auth:SIMPLE)]]] |
| 18/09/24 07:03:00 INFO table.HoodieTableConfig: Loading dataset properties from /user/hive/warehouse/stock_ticks_mor/.hoodie/hoodie.properties |
| 18/09/24 07:03:00 INFO table.HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ from /user/hive/warehouse/stock_ticks_mor |
| Metadata for table stock_ticks_mor loaded |
| |
| hoodie:stock_ticks->compactions show all |
| 18/09/24 07:03:15 INFO timeline.HoodieActiveTimeline: Loaded instants [[20180924064636__clean__COMPLETED], [20180924064636__deltacommit__COMPLETED], [20180924065057__clean__COMPLETED], [20180924065057__deltacommit__COMPLETED], [20180924070031__commit__COMPLETED]] |
| ___________________________________________________________________ |
| | Compaction Instant Time| State | Total FileIds to be Compacted| |
| |==================================================================| |
| | 20180924070031 | COMPLETED| 1 | |
| |
| ``` |
| |
| #### Step 9: Run Hive Queries including incremental queries |
| |
| You will see that both ReadOptimized and Realtime Views will show the latest committed data. |
| Lets also run the incremental query for MOR table. |
| From looking at the below query output, it will be clear that the fist commit time for the MOR table is 20180924064636 |
| and the second commit time is 20180924070031 |
| |
| ``` |
| docker exec -it adhoc-2 /bin/bash |
| beeline -u jdbc:hive2://hiveserver:10000 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false |
| |
| # Read Optimized View |
| 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'; |
| WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:59:00 | |
| +---------+----------------------+--+ |
| 1 row selected (1.6 seconds) |
| |
| 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'; |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| # Realtime View |
| 0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'; |
| WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:59:00 | |
| +---------+----------------------+--+ |
| |
| 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'; |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| # Incremental View: |
| |
| 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_mor.consume.mode=INCREMENTAL; |
| No rows affected (0.008 seconds) |
| # Max-Commits covers both second batch and compaction commit |
| 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_mor.consume.max.commits=3; |
| No rows affected (0.007 seconds) |
| 0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_mor.consume.start.timestamp=20180924064636; |
| No rows affected (0.013 seconds) |
| # Query: |
| 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG' and `_hoodie_commit_time` > '20180924064636'; |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| exit |
| exit |
| ``` |
| |
| ##### Read Optimized and Realtime Views for MOR with Spark-SQL after compaction |
| |
| ``` |
| docker exec -it adhoc-1 /bin/bash |
| bash-4.4# $SPARK_INSTALL/bin/spark-shell --jars $HUDI_SPARK_BUNDLE --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --master local[2] --executor-memory 3G --num-executors 1 --packages com.databricks:spark-avro_2.11:4.0.0 |
| |
| # Read Optimized View |
| scala> spark.sql("select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'").show(100, false) |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:59:00 | |
| +---------+----------------------+--+ |
| 1 row selected (1.6 seconds) |
| |
| scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'").show(100, false) |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| |
| # Realtime View |
| scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false) |
| +---------+----------------------+--+ |
| | symbol | _c1 | |
| +---------+----------------------+--+ |
| | GOOG | 2018-08-31 10:59:00 | |
| +---------+----------------------+--+ |
| |
| scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false) |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | _hoodie_commit_time | symbol | ts | volume | open | close | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| | 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | |
| | 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 | |
| +----------------------+---------+----------------------+---------+------------+-----------+--+ |
| ``` |
| |
| |
| This brings the demo to an end. |
| |
| ## Testing Hoodie in Local Docker environment |
| |
| You can bring up a hadoop docker environment containing Hadoop, Hive and Spark services with support for hoodie. |
| ``` |
| $ mvn pre-integration-test -DskipTests |
| ``` |
| The above command builds docker images for all the services with |
| current hoodie source installed at /var/hoodie/ws and also brings up the services using a compose file. We |
| currently use Hadoop (v2.8.4), Hive (v2.3.3) and Spark (v2.3.1) in docker images. |
| |
| To bring down the containers |
| ``` |
| $ cd hoodie-integ-test |
| $ mvn docker-compose:down |
| ``` |
| |
| If you want to bring up the docker containers, use |
| ``` |
| $ cd hoodie-integ-test |
| $ mvn docker-compose:up -DdetachedMode=true |
| ``` |
| |
| Hoodie is a library that is operated in a broader data analytics/ingestion environment |
| involving Hadoop, Hive and Spark. Interoperability with all these systems is a key objective for us. We are |
| actively adding integration-tests under __hoodie-integ-test/src/test/java__ that makes use of this |
| docker environment (See __hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestHoodieSanity.java__ ) |
| |
| |
| #### Building Local Docker Containers: |
| |
| The docker images required for demo and running integration test are already in docker-hub. The docker images |
| and compose scripts are carefully implemented so that they serve dual-purpose |
| |
| 1. The docker images have inbuilt hudi jar files with environment variable pointing to those jars (HUDI_HADOOP_BUNDLE, ...) |
| 2. For running integration-tests, we need the jars generated locally to be used for running services within docker. The |
| docker-compose scripts (see `docker/compose/docker-compose_hadoop284_hive233_spark231.yml`) ensures local jars override |
| inbuilt jars by mounting local HUDI workspace over the docker location |
| |
| This helps avoid maintaining separate docker images and avoids the costly step of building HUDI docker images locally. |
| But if users want to test hudi from locations with lower network bandwidth, they can still build local images |
| run the script |
| `docker/build_local_docker_images.sh` to build local docker images before running `docker/setup_demo.sh` |
| |
| Here are the commands: |
| |
| ``` |
| cd docker |
| ./build_local_docker_images.sh |
| ..... |
| |
| [INFO] Reactor Summary: |
| [INFO] |
| [INFO] Hoodie ............................................. SUCCESS [ 1.709 s] |
| [INFO] hoodie-common ...................................... SUCCESS [ 9.015 s] |
| [INFO] hoodie-hadoop-mr ................................... SUCCESS [ 1.108 s] |
| [INFO] hoodie-client ...................................... SUCCESS [ 4.409 s] |
| [INFO] hoodie-hive ........................................ SUCCESS [ 0.976 s] |
| [INFO] hoodie-spark ....................................... SUCCESS [ 26.522 s] |
| [INFO] hoodie-utilities ................................... SUCCESS [ 16.256 s] |
| [INFO] hoodie-cli ......................................... SUCCESS [ 11.341 s] |
| [INFO] hoodie-hadoop-mr-bundle ............................ SUCCESS [ 1.893 s] |
| [INFO] hoodie-hive-bundle ................................. SUCCESS [ 14.099 s] |
| [INFO] hoodie-spark-bundle ................................ SUCCESS [ 58.252 s] |
| [INFO] hoodie-hadoop-docker ............................... SUCCESS [ 0.612 s] |
| [INFO] hoodie-hadoop-base-docker .......................... SUCCESS [04:04 min] |
| [INFO] hoodie-hadoop-namenode-docker ...................... SUCCESS [ 6.142 s] |
| [INFO] hoodie-hadoop-datanode-docker ...................... SUCCESS [ 7.763 s] |
| [INFO] hoodie-hadoop-history-docker ....................... SUCCESS [ 5.922 s] |
| [INFO] hoodie-hadoop-hive-docker .......................... SUCCESS [ 56.152 s] |
| [INFO] hoodie-hadoop-sparkbase-docker ..................... SUCCESS [01:18 min] |
| [INFO] hoodie-hadoop-sparkmaster-docker ................... SUCCESS [ 2.964 s] |
| [INFO] hoodie-hadoop-sparkworker-docker ................... SUCCESS [ 3.032 s] |
| [INFO] hoodie-hadoop-sparkadhoc-docker .................... SUCCESS [ 2.764 s] |
| [INFO] hoodie-integ-test .................................. SUCCESS [ 1.785 s] |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] BUILD SUCCESS |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] Total time: 09:15 min |
| [INFO] Finished at: 2018-09-10T17:47:37-07:00 |
| [INFO] Final Memory: 236M/1848M |
| [INFO] ------------------------------------------------------------------------ |
| ``` |