This guide will get you up and running with Apache Flink to do real-time analytics, covering some powerful features of Fluss. The guide is derived from TPC-H Q5.
For more information on working with Flink, refer to the Apache Flink Engine section.
Before proceeding with this guide, ensure that Docker and the Docker Compose plugin are installed on your machine. All commands were tested with Docker version 27.4.0 and Docker Compose version v2.30.3.
:::note We encourage you to use a recent version of Docker and Compose v2 (however, Compose v1 might work with a few adaptions). :::
We will use docker compose to spin up the required components for this tutorial.
mkdir fluss-quickstart-flink cd fluss-quickstart-flink
docker-compose.yml file with the following content:services: #begin Fluss cluster coordinator-server: image: apache/fluss:$FLUSS_DOCKER_VERSION$ command: coordinatorServer depends_on: - zookeeper environment: - | FLUSS_PROPERTIES= zookeeper.address: zookeeper:2181 bind.listeners: FLUSS://coordinator-server:9123 remote.data.dir: /tmp/fluss/remote-data tablet-server: image: apache/fluss:$FLUSS_DOCKER_VERSION$ command: tabletServer depends_on: - coordinator-server environment: - | FLUSS_PROPERTIES= zookeeper.address: zookeeper:2181 bind.listeners: FLUSS://tablet-server:9123 data.dir: /tmp/fluss/data remote.data.dir: /tmp/fluss/remote-data kv.snapshot.interval: 0s zookeeper: restart: always image: zookeeper:3.9.2 #end #begin Flink cluster jobmanager: image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ ports: - "8083:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ depends_on: - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 10 taskmanager.memory.process.size: 2048m taskmanager.memory.framework.off-heap.size: 256m #end
The Docker Compose environment consists of the following containers:
CoordinatorServer, a Fluss TabletServer and a ZooKeeper server.JobManager and a Flink TaskManager container to execute queries.Note: The apache/fluss-quickstart-flink image is based on flink:1.20.1-java17 and includes the fluss-flink and flink-connector-faker to simplify this guide.
docker compose up -d
This command automatically starts all the containers defined in the Docker Compose configuration in detached mode.
Run
docker container ls -a
to check whether all containers are running properly.
You can also visit http://localhost:8083/ to see if Flink is running normally.
:::note
FLINK_HOME/lib/.docker compose should be executed in the created working directory that contains the docker-compose.yml file. :::Congratulations, you are all set!
First, use the following command to enter the Flink SQL CLI Container:
docker compose exec jobmanager ./sql-client
Note: To simplify this guide, three temporary tables have been pre-created with faker connector to generate data. You can view their schemas by running the following commands:
SHOW CREATE TABLE source_customer;
SHOW CREATE TABLE source_order;
SHOW CREATE TABLE source_nation;
Use the following SQL to create a Fluss catalog:
CREATE CATALOG fluss_catalog WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'coordinator-server:9123' );
USE CATALOG fluss_catalog;
:::info By default, catalog configurations are not persisted across Flink SQL client sessions. For further information how to store catalog configurations, see Flink's Catalog Store. :::
Running the following SQL to create Fluss tables to be used in this guide:
CREATE TABLE fluss_order ( `order_key` BIGINT, `cust_key` INT NOT NULL, `total_price` DECIMAL(15, 2), `order_date` DATE, `order_priority` STRING, `clerk` STRING, `ptime` AS PROCTIME(), PRIMARY KEY (`order_key`) NOT ENFORCED );
CREATE TABLE fluss_customer ( `cust_key` INT NOT NULL, `name` STRING, `phone` STRING, `nation_key` INT NOT NULL, `acctbal` DECIMAL(15, 2), `mktsegment` STRING, PRIMARY KEY (`cust_key`) NOT ENFORCED );
CREATE TABLE fluss_nation ( `nation_key` INT NOT NULL, `name` STRING, PRIMARY KEY (`nation_key`) NOT ENFORCED );
CREATE TABLE enriched_orders ( `order_key` BIGINT, `cust_key` INT NOT NULL, `total_price` DECIMAL(15, 2), `order_date` DATE, `order_priority` STRING, `clerk` STRING, `cust_name` STRING, `cust_phone` STRING, `cust_acctbal` DECIMAL(15, 2), `cust_mktsegment` STRING, `nation_name` STRING, PRIMARY KEY (`order_key`) NOT ENFORCED );
First, run the following SQL to sync data from source tables to Fluss tables:
EXECUTE STATEMENT SET BEGIN INSERT INTO fluss_nation SELECT * FROM `default_catalog`.`default_database`.source_nation; INSERT INTO fluss_customer SELECT * FROM `default_catalog`.`default_database`.source_customer; INSERT INTO fluss_order SELECT * FROM `default_catalog`.`default_database`.source_order; END;
Fluss primary-key tables support high QPS point lookup queries on primary keys. Performing a lookup join is really efficient and you can use it to enrich the fluss_orders table with information from the fluss_customer and fluss_nation primary-key tables.
INSERT INTO enriched_orders SELECT o.order_key, o.cust_key, o.total_price, o.order_date, o.order_priority, o.clerk, c.name, c.phone, c.acctbal, c.mktsegment, n.name FROM fluss_order o LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON o.cust_key = c.cust_key LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n` ON c.nation_key = n.nation_key;
You can now perform real-time analytics directly on Fluss tables. For instance, to calculate the number of orders placed by a specific customer, you can execute the following SQL query to obtain instant, real-time results.
-- use tableau result mode SET 'sql-client.execution.result-mode' = 'tableau';
-- switch to batch mode SET 'execution.runtime-mode' = 'batch';
-- execute DML job synchronously SET 'table.dml-sync' = 'true';
-- use limit to query the enriched_orders table SELECT * FROM enriched_orders LIMIT 2;
Sample Output
+-----------+----------+-------------+------------+----------------+--------+------------+----------------+--------------+-----------------+-------------+ | order_key | cust_key | total_price | order_date | order_priority | clerk | cust_name | cust_phone | cust_acctbal | cust_mktsegment | nation_name | +-----------+----------+-------------+------------+----------------+--------+------------+----------------+--------------+-----------------+-------------+ | 23199744 | 9 | 266.44 | 2024-08-29 | high | Clerk1 | Joe King | 908.207.8513 | 124.28 | FURNITURE | JORDAN | | 10715776 | 2 | 924.43 | 2024-11-04 | medium | Clerk3 | Rita Booke | (925) 775-0717 | 172.39 | FURNITURE | UNITED | +-----------+----------+-------------+------------+----------------+--------+------------+----------------+--------------+-----------------+-------------+
If you are interested in a specific customer, you can retrieve their details by performing a lookup on the cust_key.
-- lookup by primary key SELECT * FROM fluss_customer WHERE `cust_key` = 1;
Sample Output
+----------+---------------+--------------+------------+---------+------------+ | cust_key | name | phone | nation_key | acctbal | mktsegment | +----------+---------------+--------------+------------+---------+------------+ | 1 | Al K. Seltzer | 817-617-7960 | 1 | 533.41 | AUTOMOBILE | +----------+---------------+--------------+------------+---------+------------+
Note: Overall the query results are returned really fast, as Fluss enables efficient primary key lookups for tables with defined primary keys.
You can use UPDATE and DELETE statements to update/delete rows on Fluss tables.
-- update by primary key UPDATE fluss_customer SET `name` = 'fluss_updated' WHERE `cust_key` = 1;
Then you can lookup the specific row:
SELECT * FROM fluss_customer WHERE `cust_key` = 1;
Sample Output
+----------+---------------+--------------+------------+---------+------------+ | cust_key | name | phone | nation_key | acctbal | mktsegment | +----------+---------------+--------------+------------+---------+------------+ | 1 | fluss_updated | 817-617-7960 | 1 | 533.41 | AUTOMOBILE | +----------+---------------+--------------+------------+---------+------------+
Notice that the name column has been updated to fluss_updated.
DELETE FROM fluss_customer WHERE `cust_key` = 1;
The following SQL query should return an empty result.
SELECT * FROM fluss_customer WHERE `cust_key` = 1;
After finishing the tutorial, run exit to exit Flink SQL CLI Container and then run
docker compose down -v
to stop all containers.
Now that you're up and running with Fluss and Flink, check out the Apache Flink Engine docs to learn more features with Flink or this guide to learn how to set up an observability stack for Fluss and Flink.