import Tabs from ‘@theme/Tabs’; import TabItem from ‘@theme/TabItem’;
This guide will help you set up a basic Streaming Lakehouse using Fluss with Paimon or Iceberg, and help you better understand the powerful feature of Union Read.
Before proceeding with this guide, ensure that Docker and the Docker Compose plugin are installed on your machine. All commands were tested with Docker version 27.4.0 and Docker Compose version v2.30.3.
:::note We encourage you to use a recent version of Docker and Compose v2 (however, Compose v1 might work with a few adaptions). :::
We will use docker compose to spin up the required components for this tutorial.
mkdir fluss-quickstart-paimon cd fluss-quickstart-paimon
docker-compose.yml file with the following content:services: #begin Fluss cluster coordinator-server: image: apache/fluss:$FLUSS_DOCKER_VERSION$ command: coordinatorServer depends_on: - zookeeper environment: - | FLUSS_PROPERTIES= zookeeper.address: zookeeper:2181 bind.listeners: FLUSS://coordinator-server:9123 remote.data.dir: /tmp/fluss/remote-data datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: /tmp/paimon volumes: - shared-tmpfs:/tmp/paimon tablet-server: image: apache/fluss:$FLUSS_DOCKER_VERSION$ command: tabletServer depends_on: - coordinator-server environment: - | FLUSS_PROPERTIES= zookeeper.address: zookeeper:2181 bind.listeners: FLUSS://tablet-server:9123 data.dir: /tmp/fluss/data remote.data.dir: /tmp/fluss/remote-data kv.snapshot.interval: 0s datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: /tmp/paimon volumes: - shared-tmpfs:/tmp/paimon zookeeper: restart: always image: zookeeper:3.9.2 #end #begin Flink cluster jobmanager: image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ ports: - "8083:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager volumes: - shared-tmpfs:/tmp/paimon taskmanager: image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ depends_on: - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 10 taskmanager.memory.process.size: 2048m taskmanager.memory.framework.off-heap.size: 256m volumes: - shared-tmpfs:/tmp/paimon #end volumes: shared-tmpfs: driver: local driver_opts: type: "tmpfs" device: "tmpfs"
The Docker Compose environment consists of the following containers:
CoordinatorServer, a Fluss TabletServer and a ZooKeeper server.JobManager and a Flink TaskManager container to execute queries.Note: The apache/fluss-quickstart-flink image is based on flink:1.20.3-java17 and includes the fluss-flink, paimon-flink and flink-connector-faker to simplify this guide.
docker compose up -d
This command automatically starts all the containers defined in the Docker Compose configuration in detached mode.
Run
docker container ls -a
to check whether all containers are running properly.
You can also visit http://localhost:8083/ to see if Flink is running normally.
:::note
FLINK_HOME/lib/.docker compose should be executed in the created working directory that contains the docker-compose.yml file. :::Congratulations, you are all set!
We will use docker compose to spin up the required components for this tutorial.
mkdir fluss-quickstart-iceberg cd fluss-quickstart-iceberg
lib directory and download the required Hadoop jar file:mkdir lib wget -O lib/hadoop-apache-3.3.5-2.jar https://repo1.maven.org/maven2/io/trino/hadoop/hadoop-apache/3.3.5-2/hadoop-apache-3.3.5-2.jar
This jar file provides Hadoop 3.3.5 dependencies required for Iceberg's Hadoop catalog integration.
:::info The lib directory serves as a staging area for additional jars needed by the Fluss coordinator server. The docker-compose configuration (see step 3) mounts this directory and copies all jars to /opt/fluss/plugins/iceberg/ inside the coordinator container at startup.
You can add more jars to this lib directory based on your requirements:
iceberg-aws-bundle)Any jar placed in the lib directory will be automatically loaded by the Fluss coordinator server, making it available for Iceberg integration. :::
docker-compose.yml file with the following content:services: zookeeper: restart: always image: zookeeper:3.9.2 coordinator-server: image: apache/fluss:$FLUSS_DOCKER_VERSION$ depends_on: - zookeeper environment: - | FLUSS_PROPERTIES= zookeeper.address: zookeeper:2181 bind.listeners: FLUSS://coordinator-server:9123 remote.data.dir: /tmp/fluss/remote-data datalake.format: iceberg datalake.iceberg.type: hadoop datalake.iceberg.warehouse: /tmp/iceberg volumes: - shared-tmpfs:/tmp/iceberg - ./lib:/tmp/lib entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/fluss/plugins/iceberg/ && exec /docker-entrypoint.sh coordinatorServer"] tablet-server: image: apache/fluss:$FLUSS_DOCKER_VERSION$ command: tabletServer depends_on: - coordinator-server environment: - | FLUSS_PROPERTIES= zookeeper.address: zookeeper:2181 bind.listeners: FLUSS://tablet-server:9123 data.dir: /tmp/fluss/data remote.data.dir: /tmp/fluss/remote-data kv.snapshot.interval: 0s datalake.format: iceberg datalake.iceberg.type: hadoop datalake.iceberg.warehouse: /tmp/iceberg volumes: - shared-tmpfs:/tmp/iceberg jobmanager: image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ ports: - "8083:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager volumes: - shared-tmpfs:/tmp/iceberg taskmanager: image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ depends_on: - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 10 taskmanager.memory.process.size: 2048m taskmanager.memory.framework.off-heap.size: 256m volumes: - shared-tmpfs:/tmp/iceberg volumes: shared-tmpfs: driver: local driver_opts: type: "tmpfs" device: "tmpfs"
The Docker Compose environment consists of the following containers:
CoordinatorServer, a Fluss TabletServer and a ZooKeeper server.JobManager and a Flink TaskManager container to execute queries.Note: The apache/fluss-quickstart-flink image is based on flink:1.20.3-java17 and includes the fluss-flink, iceberg-flink and flink-connector-faker to simplify this guide.
docker compose up -d
This command automatically starts all the containers defined in the Docker Compose configuration in detached mode.
Run
docker container ls -a
to check whether all containers are running properly.
You can also visit http://localhost:8083/ to see if Flink is running normally.
:::note
FLINK_HOME/lib/.docker compose should be executed in the created working directory that contains the docker-compose.yml file. :::Congratulations, you are all set!
First, use the following command to enter the Flink SQL CLI Container:
docker compose exec jobmanager ./sql-client
Note: To simplify this guide, three temporary tables have been pre-created with faker connector to generate data. You can view their schemas by running the following commands:
SHOW CREATE TABLE source_customer;
SHOW CREATE TABLE source_order;
SHOW CREATE TABLE source_nation;
Use the following SQL to create a Fluss catalog:
CREATE CATALOG fluss_catalog WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'coordinator-server:9123' );
USE CATALOG fluss_catalog;
:::info By default, catalog configurations are not persisted across Flink SQL client sessions. For further information how to store catalog configurations, see Flink's Catalog Store. :::
Running the following SQL to create Fluss tables to be used in this guide:
CREATE TABLE fluss_order ( `order_key` BIGINT, `cust_key` INT NOT NULL, `total_price` DECIMAL(15, 2), `order_date` DATE, `order_priority` STRING, `clerk` STRING, `ptime` AS PROCTIME(), PRIMARY KEY (`order_key`) NOT ENFORCED );
CREATE TABLE fluss_customer ( `cust_key` INT NOT NULL, `name` STRING, `phone` STRING, `nation_key` INT NOT NULL, `acctbal` DECIMAL(15, 2), `mktsegment` STRING, PRIMARY KEY (`cust_key`) NOT ENFORCED );
CREATE TABLE fluss_nation ( `nation_key` INT NOT NULL, `name` STRING, PRIMARY KEY (`nation_key`) NOT ENFORCED );
First, run the following SQL to sync data from source tables to Fluss tables:
EXECUTE STATEMENT SET BEGIN INSERT INTO fluss_nation SELECT * FROM `default_catalog`.`default_database`.source_nation; INSERT INTO fluss_customer SELECT * FROM `default_catalog`.`default_database`.source_customer; INSERT INTO fluss_order SELECT * FROM `default_catalog`.`default_database`.source_order; END;
To integrate with Apache Paimon, you need to start the Lakehouse Tiering Service. Open a new terminal, navigate to the fluss-quickstart-flink directory, and execute the following command within this directory to start the service:
docker compose exec jobmanager \ /opt/flink/bin/flink run \ /opt/flink/opt/fluss-flink-tiering-$FLUSS_VERSION$.jar \ --fluss.bootstrap.servers coordinator-server:9123 \ --datalake.format paimon \ --datalake.paimon.metastore filesystem \ --datalake.paimon.warehouse /tmp/paimon
You should see a Flink Job to tier data from Fluss to Paimon running in the Flink Web UI.
To integrate with Apache Iceberg, you need to start the Lakehouse Tiering Service. Open a new terminal, navigate to the fluss-quickstart-flink-iceberg directory, and execute the following command within this directory to start the service:
docker compose exec jobmanager \ /opt/flink/bin/flink run \ /opt/flink/opt/fluss-flink-tiering-$FLUSS_VERSION$.jar \ --fluss.bootstrap.servers coordinator-server:9123 \ --datalake.format iceberg \ --datalake.iceberg.type hadoop \ --datalake.iceberg.warehouse /tmp/iceberg
You should see a Flink Job to tier data from Fluss to Iceberg running in the Flink Web UI.
By default, tables are created with data lake integration disabled, meaning the Lakehouse Tiering Service will not tier the table's data to the data lake.
To enable lakehouse functionality as a tiered storage solution for a table, you must create the table with the configuration option table.datalake.enabled = true. Return to the SQL client and execute the following SQL statement to create a table with data lake integration enabled:
CREATE TABLE datalake_enriched_orders ( `order_key` BIGINT, `cust_key` INT NOT NULL, `total_price` DECIMAL(15, 2), `order_date` DATE, `order_priority` STRING, `clerk` STRING, `cust_name` STRING, `cust_phone` STRING, `cust_acctbal` DECIMAL(15, 2), `cust_mktsegment` STRING, `nation_name` STRING, PRIMARY KEY (`order_key`) NOT ENFORCED ) WITH ( 'table.datalake.enabled' = 'true', 'table.datalake.freshness' = '30s' );
Next, perform streaming data writing into the datalake-enabled table, datalake_enriched_orders:
-- switch to streaming mode SET 'execution.runtime-mode' = 'streaming';
-- insert tuples into datalake_enriched_orders INSERT INTO datalake_enriched_orders SELECT o.order_key, o.cust_key, o.total_price, o.order_date, o.order_priority, o.clerk, c.name, c.phone, c.acctbal, c.mktsegment, n.name FROM fluss_order o LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON o.cust_key = c.cust_key LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n` ON c.nation_key = n.nation_key;
By default, tables are created with data lake integration disabled, meaning the Lakehouse Tiering Service will not tier the table's data to the data lake.
To enable lakehouse functionality as a tiered storage solution for a table, you must create the table with the configuration option table.datalake.enabled = true. Return to the SQL client and execute the following SQL statement to create a table with data lake integration enabled:
CREATE TABLE datalake_enriched_orders ( `order_key` BIGINT, `cust_key` INT NOT NULL, `total_price` DECIMAL(15, 2), `order_date` DATE, `order_priority` STRING, `clerk` STRING, `cust_name` STRING, `cust_phone` STRING, `cust_acctbal` DECIMAL(15, 2), `cust_mktsegment` STRING, `nation_name` STRING ) WITH ( 'table.datalake.enabled' = 'true', 'table.datalake.freshness' = '30s' );
Next, perform streaming data writing into the datalake-enabled table, datalake_enriched_orders:
-- switch to streaming mode SET 'execution.runtime-mode' = 'streaming';
-- insert tuples into datalake_enriched_orders INSERT INTO datalake_enriched_orders SELECT o.order_key, o.cust_key, o.total_price, o.order_date, o.order_priority, o.clerk, c.name, c.phone, c.acctbal, c.mktsegment, n.name FROM ( SELECT *, PROCTIME() as ptime FROM `default_catalog`.`default_database`.source_order ) o LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF o.ptime AS c ON o.cust_key = c.cust_key LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF o.ptime AS n ON c.nation_key = n.nation_key;
The data for the datalake_enriched_orders table is stored in Fluss (for real-time data) and Paimon (for historical data).
When querying the datalake_enriched_orders table, Fluss uses a union operation that combines data from both Fluss and Paimon to provide a complete result set -- combines real-time and historical data.
If you wish to query only the data stored in Paimon—offering high-performance access without the overhead of unioning data—you can use the datalake_enriched_orders$lake table by appending the $lake suffix. This approach also enables all the optimizations and features of a Flink Paimon table source, including system table such as datalake_enriched_orders$lake$snapshots.
To query the snapshots directly from Paimon, use the following SQL:
-- switch to batch mode SET 'execution.runtime-mode' = 'batch';
-- query snapshots in paimon SELECT snapshot_id, total_record_count FROM datalake_enriched_orders$lake$snapshots;
Sample Output:
+-------------+--------------------+ | snapshot_id | total_record_count | +-------------+--------------------+ | 1 | 650 | +-------------+--------------------+
Note: Make sure to wait for the configured datalake.freshness (~30s) to complete before querying the snapshots, otherwise the result will be empty.
Run the following SQL to do analytics on Paimon data:
-- to sum prices of all orders in paimon SELECT sum(total_price) as sum_price FROM datalake_enriched_orders$lake;
Sample Output:
+------------+ | sum_price | +------------+ | 1669519.92 | +------------+
To achieve results with sub-second data freshness, you can query the table directly, which seamlessly unifies data from both Fluss and Paimon:
-- to sum prices of all orders in fluss and paimon SELECT sum(total_price) as sum_price FROM datalake_enriched_orders;
The result looks like:
+------------+ | sum_price | +------------+ | 1777908.36 | +------------+
You can execute the real-time analytics query multiple times, and the results will vary with each run as new data is continuously written to Fluss in real-time.
Finally, you can use the following command to view the files stored in Paimon:
docker compose exec taskmanager tree /tmp/paimon/fluss.db
Sample Output:
/tmp/paimon/fluss.db └── datalake_enriched_orders ├── bucket-0 │ ├── changelog-aef1810f-85b2-4eba-8eb8-9b136dec5bdb-0.orc │ └── data-aef1810f-85b2-4eba-8eb8-9b136dec5bdb-1.orc ├── manifest │ ├── manifest-aaa007e1-81a2-40b3-ba1f-9df4528bc402-0 │ ├── manifest-aaa007e1-81a2-40b3-ba1f-9df4528bc402-1 │ ├── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-0 │ ├── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-1 │ └── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-2 ├── schema │ └── schema-0 └── snapshot ├── EARLIEST ├── LATEST └── snapshot-1
The files adhere to Paimon's standard format, enabling seamless querying with other engines such as Spark and Trino.
The data for the datalake_enriched_orders table is stored in Fluss (for real-time data) and Iceberg (for historical data).
When querying the datalake_enriched_orders table, Fluss uses a union operation that combines data from both Fluss and Iceberg to provide a complete result set -- combines real-time and historical data.
If you wish to query only the data stored in Iceberg—offering high-performance access without the overhead of unioning data—you can use the datalake_enriched_orders$lake table by appending the $lake suffix. This approach also enables all the optimizations and features of a Flink Iceberg table source, including system table such as datalake_enriched_orders$lake$snapshots.
-- switch to batch mode SET 'execution.runtime-mode' = 'batch';
-- query snapshots in iceberg SELECT snapshot_id, operation FROM datalake_enriched_orders$lake$snapshots;
Sample Output:
+---------------------+-----------+ | snapshot_id | operation | +---------------------+-----------+ | 7792523713868625335 | append | | 7960217942125627573 | append | +---------------------+-----------+
Note: Make sure to wait for the configured datalake.freshness (~30s) to complete before querying the snapshots, otherwise the result will be empty.
Run the following SQL to do analytics on Iceberg data:
-- to sum prices of all orders in iceberg SELECT sum(total_price) as sum_price FROM datalake_enriched_orders$lake;
Sample Output:
+-----------+ | sum_price | +-----------+ | 432880.93 | +-----------+
To achieve results with sub-second data freshness, you can query the table directly, which seamlessly unifies data from both Fluss and Iceberg:
-- to sum prices of all orders (combining fluss and iceberg data) SELECT sum(total_price) as sum_price FROM datalake_enriched_orders;
Sample Output:
+-----------+ | sum_price | +-----------+ | 558660.03 | +-----------+
You can execute the real-time analytics query multiple times, and the results will vary with each run as new data is continuously written to Fluss in real-time.
Finally, you can use the following command to view the files stored in Iceberg:
docker compose exec taskmanager tree /tmp/iceberg/fluss
Sample Output:
/tmp/iceberg/fluss └── datalake_enriched_orders ├── data │ └── 00000-0-abc123.parquet └── metadata ├── snap-1234567890123456789-1-abc123.avro └── v1.metadata.json
The files adhere to Iceberg's standard format, enabling seamless querying with other engines such as Spark and Trino.
After finishing the tutorial, run exit to exit Flink SQL CLI Container and then run
docker compose down -v
to stop all containers.