This guide demonstrates how to secure your Fluss cluster using two practical examples:
These scenarios will help you understand how to configure authentication and authorization, manage access control, and implement data isolation in real-world use cases.
In this example, we assume there are three users within a department:
admin: A superuser who can manage the entire Fluss cluster.developer: A user that is allowed to read and write data.consumer: A user that is allowed to read data only.Before proceeding with this guide, ensure that Docker and the Docker Compose plugin are installed on your machine. All commands were tested with Docker version 27.4.0 and Docker Compose version v2.30.3.
:::note We encourage you to use a recent version of Docker and Compose v2 (however, Compose v1 might work with a few adaptations). :::
We will use docker compose to spin up the required components for this tutorial.
mkdir fluss-quickstart-security cd fluss-quickstart-security
docker-compose.yml file with the following content:services: #begin Fluss cluster coordinator-server: image: apache/fluss:$FLUSS_DOCKER_VERSION$ command: coordinatorServer depends_on: - zookeeper environment: - | FLUSS_PROPERTIES= zookeeper.address: zookeeper:2181 bind.listeners: INTERNAL://coordinator-server:0, CLIENT://coordinator-server:9123 internal.listener.name: INTERNAL remote.data.dir: /tmp/fluss/remote-data # security properties security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT security.sasl.enabled.mechanisms: PLAIN security.sasl.plain.jaas.config: org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin="admin-pass" user_developer="developer-pass" user_consumer="consumer-pass"; authorizer.enabled: true super.users: User:admin tablet-server: image: apache/fluss:$FLUSS_DOCKER_VERSION$ command: tabletServer depends_on: - coordinator-server environment: - | FLUSS_PROPERTIES= zookeeper.address: zookeeper:2181 bind.listeners: INTERNAL://tablet-server:0, CLIENT://tablet-server:9123 internal.listener.name: INTERNAL tablet-server.id: 0 kv.snapshot.interval: 0s data.dir: /tmp/fluss/data remote.data.dir: /tmp/fluss/remote-data # security properties security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT security.sasl.enabled.mechanisms: PLAIN security.sasl.plain.jaas.config: org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin="admin-pass" user_developer="developer-pass" user_consumer="consumer-pass"; authorizer.enabled: true super.users: User:admin volumes: - shared-tmpfs:/tmp/fluss zookeeper: restart: always image: zookeeper:3.9.2 #end #begin Flink cluster jobmanager: image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ ports: - "8083:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager volumes: - shared-tmpfs:/tmp/paimon taskmanager: image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ depends_on: - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 10 taskmanager.memory.process.size: 2048m taskmanager.memory.framework.off-heap.size: 256m volumes: - shared-tmpfs:/tmp/paimon #end volumes: shared-tmpfs: driver: local driver_opts: type: "tmpfs" device: "tmpfs"
The Docker Compose environment consists of the following containers:
CoordinatorServer, a Fluss TabletServer and a ZooKeeper server. It uses SASL/PLAIN for user authentication and defines three users: admin, developer, and consumer. The admin user is a super.users who has full administrative privileges on the Fluss cluster.JobManager and a Flink TaskManager container to execute queries.Note: The apache/fluss-quickstart-flink image is based on flink:1.20.3-java17 and includes the fluss-flink connector to simplify this guide.
docker compose up -d
This command automatically starts all the containers defined in the Docker Compose configuration in detached mode.
Run
docker container ls -a
to check whether all containers are running properly.
You can also visit http://localhost:8083/ to see if Flink is running normally.
First, use the following command to enter the Flink SQL CLI Container:
docker compose exec jobmanager ./sql-client
Create separate catalogs for each user:
CREATE CATALOG admin_catalog WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'coordinator-server:9123', 'client.security.protocol' = 'SASL', 'client.security.sasl.mechanism' = 'PLAIN', 'client.security.sasl.username' = 'admin', 'client.security.sasl.password' = 'admin-pass' );
CREATE CATALOG developer_catalog WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'coordinator-server:9123', 'client.security.protocol' = 'SASL', 'client.security.sasl.mechanism' = 'PLAIN', 'client.security.sasl.username' = 'developer', 'client.security.sasl.password' = 'developer-pass' );
CREATE CATALOG consumer_catalog WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'coordinator-server:9123', 'client.security.protocol' = 'SASL', 'client.security.sasl.mechanism' = 'PLAIN', 'client.security.sasl.username' = 'consumer', 'client.security.sasl.password' = 'consumer-pass' );
As the admin user, add ACLs to grant permissions:
Allow developer user to read and write data:
CALL admin_catalog.sys.add_acl( resource => 'cluster', permission => 'ALLOW', principal => 'User:developer', operation => 'WRITE' ); CALL admin_catalog.sys.add_acl( resource => 'cluster', permission => 'ALLOW', principal => 'User:developer', operation => 'READ' );
Allow consumer user to read data:
CALL admin_catalog.sys.add_acl( resource => 'cluster', permission => 'ALLOW', principal => 'User:consumer', operation => 'READ' );
Lookup the ACLs:
CALL admin_catalog.sys.list_acl( resource => 'cluster' );
Output will show like:
+---------------------------------------------------------------------------------------------+ | result | +---------------------------------------------------------------------------------------------+ | resource="cluster";permission="ALLOW";principal="User:developer";operation="READ";host="*" | | resource="cluster";permission="ALLOW";principal="User:developer";operation="WRITE";host="*" | | resource="cluster";permission="ALLOW";principal="User:consumer";operation="READ";host="*" | +---------------------------------------------------------------------------------------------+ 3 rows in set
Only the admin user can create tables:
-- switch to admin user context USE CATALOG admin_catalog; -- create table using admin credentials CREATE TABLE fluss_order ( `order_key` INT NOT NULL, `total_price` DECIMAL(15, 2), PRIMARY KEY (`order_key`) NOT ENFORCED );
Output:
[INFO] Execute statement succeeded.
The developer user cannot create tables:
-- switch to developer user context USE CATALOG developer_catalog; -- create table using developer credentials CREATE TABLE fluss_order1( `order_key` INT NOT NULL, `total_price` DECIMAL(15, 2), PRIMARY KEY (`order_key`) NOT ENFORCED );
Output:
[ERROR] Could not execute SQL statement. Reason: org.apache.fluss.exception.AuthorizationException: Principal FlussPrincipal{name='developer', type='User'} have no authorization to operate CREATE on resource Resource{type=DATABASE, name='fluss'}
The consumer user also cannot create tables:
-- switch to consumer user context USE CATALOG consumer_catalog; -- create table using consumer credentials CREATE TABLE fluss_order2( `order_key` INT NOT NULL, `total_price` DECIMAL(15, 2), PRIMARY KEY (`order_key`) NOT ENFORCED );
Output:
[ERROR] Could not execute SQL statement. Reason: org.apache.fluss.exception.AuthorizationException: Principal FlussPrincipal{name='consumer', type='User'} have no authorization to operate CREATE on resource Resource{type=DATABASE, name='fluss'}
Write data using the developer user:
-- switch to developer user context USE CATALOG developer_catalog; -- write data using developer credentials INSERT INTO fluss_order VALUES (1, 1.0);
The job should succeed as shown in the Flink UI.
Attempting to write data using the consumer user will fail in the Flink UI:
-- switch to consumer user context USE CATALOG consumer_catalog; -- write data using consumer credentials INSERT INTO fluss_order VALUES (1, 1.0);
Output:
Caused by: java.util.concurrent.CompletionException: org.apache.fluss.exception.AuthorizationException: No WRITE permission among all the tables: [fluss.fluss_order]
Read data using the consumer user:
SET 'execution.runtime-mode' = 'batch'; -- use tableau result mode SET 'sql-client.execution.result-mode' = 'tableau'; -- switch to consumer user context USE CATALOG consumer_catalog; -- read data using consumer credentials SELECT * FROM `consumer_catalog`.`fluss`.`fluss_order` LIMIT 10;
Output:
+-----------+-------------+ | order_key | total_price | +-----------+-------------+ | 1 | 1.00 | +-----------+-------------+ 1 row in set (5.27 seconds)
Attempting to read data using the developer user also get the same result:
SET 'execution.runtime-mode' = 'batch'; -- use tableau result mode SET 'sql-client.execution.result-mode' = 'tableau'; -- switch to developer user context USE CATALOG developer_catalog; -- read data using developer credentials SELECT * FROM `developer_catalog`.`fluss`.`fluss_order` LIMIT 10;
This example shows how to enable multi-tenant isolation in a Fluss cluster.
We'll demonstrate two departments — marketing and finance — each with its own dedicated database. The cluster includes the following users:
admin: A superuser with full access.marketing: A user who can only access the marketing_db database.finance: A user who can only access the finance_db database.All the steps are same as Example 1, but update the JAAS configuration to include the new users:
services: #begin Fluss cluster coordinator-server: image: apache/fluss:$FLUSS_DOCKER_VERSION$ command: coordinatorServer depends_on: - zookeeper environment: - | FLUSS_PROPERTIES= zookeeper.address: zookeeper:2181 bind.listeners: INTERNAL://coordinator-server:0, CLIENT://coordinator-server:9123 internal.listener.name: INTERNAL remote.data.dir: /tmp/fluss/remote-data # security properties security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT security.sasl.enabled.mechanisms: PLAIN security.sasl.plain.jaas.config: org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin="admin-pass" user_marketing="marketing-pass" user_finance="finance-pass"; authorizer.enabled: true super.users: User:admin tablet-server: image: apache/fluss:$FLUSS_DOCKER_VERSION$ command: tabletServer depends_on: - coordinator-server environment: - | FLUSS_PROPERTIES= zookeeper.address: zookeeper:2181 bind.listeners: INTERNAL://tablet-server:0, CLIENT://tablet-server:9123 internal.listener.name: INTERNAL tablet-server.id: 0 kv.snapshot.interval: 0s data.dir: /tmp/fluss/data remote.data.dir: /tmp/fluss/remote-data # security properties security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT security.sasl.enabled.mechanisms: PLAIN security.sasl.plain.jaas.config: org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin="admin-pass" user_marketing="marketing-pass" user_finance="finance-pass"; authorizer.enabled: true super.users: User:admin volumes: - shared-tmpfs:/tmp/fluss zookeeper: restart: always image: zookeeper:3.9.2 #end #begin Flink cluster jobmanager: image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ ports: - "8083:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager volumes: - shared-tmpfs:/tmp/paimon taskmanager: image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ depends_on: - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 10 taskmanager.memory.process.size: 2048m taskmanager.memory.framework.off-heap.size: 256m volumes: - shared-tmpfs:/tmp/paimon #end volumes: shared-tmpfs: driver: local driver_opts: type: "tmpfs" device: "tmpfs"
First, use the following command to enter the Flink SQL CLI Container:
docker compose exec jobmanager ./sql-client
Create separate catalogs for the admin, marketing, and finance users:
CREATE CATALOG admin_catalog WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'coordinator-server:9123', 'client.security.protocol' = 'SASL', 'client.security.sasl.mechanism' = 'PLAIN', 'client.security.sasl.username' = 'admin', 'client.security.sasl.password' = 'admin-pass' );
CREATE CATALOG marketing_catalog WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'coordinator-server:9123', 'client.security.protocol' = 'SASL', 'client.security.sasl.mechanism' = 'PLAIN', 'client.security.sasl.username' = 'marketing', 'client.security.sasl.password' = 'marketing-pass' );
CREATE CATALOG finance_catalog WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'coordinator-server:9123', 'client.security.protocol' = 'SASL', 'client.security.sasl.mechanism' = 'PLAIN', 'client.security.sasl.username' = 'finance', 'client.security.sasl.password' = 'finance-pass' );
As the admin user, create two databases and assign appropriate ACLs:
CREATE DATABASE `admin_catalog`.`marketing_db`; CALL admin_catalog.sys.add_acl( resource => 'cluster.marketing_db', permission => 'ALLOW', principal => 'User:marketing', operation => 'ALL' ); CREATE DATABASE `admin_catalog`.`finance_db`; CALL admin_catalog.sys.add_acl( resource => 'cluster.finance_db', permission => 'ALLOW', principal => 'User:finance', operation => 'ALL' );
Lookup the ACLs:
CALL admin_catalog.sys.list_acl( resource => 'ANY' );
Output will show like:
+--------------------------------------------------------------------------------------------------------+ | result | +--------------------------------------------------------------------------------------------------------+ | resource="cluster.marketing_db";permission="ALLOW";principal="User:marketing";operation="ALL";host="*" | | resource="cluster.finance_db";permission="ALLOW";principal="User:finance";operation="ALL";host="*" | +--------------------------------------------------------------------------------------------------------+ 2 rows in set
The marketing user can only see the marketing_db database
-- switch to marketing user context USE CATALOG marketing_catalog; -- show databases using marketing user credentials SHOW DATABASES;
Output:
+---------------+ | database name | +---------------+ | marketing_db | +---------------+ 1 row in set
The finance user can only see the finance_db database:
-- switch to finance user context USE CATALOG finance_catalog; -- show databases using finance user credentials SHOW DATABASES;
Output:
+---------------+ | database name | +---------------+ | finance_db | +---------------+ 1 row in set
The marketing user can operate on their own database:
-- switch to marketing user context USE CATALOG marketing_catalog; -- create table using marketing user credentials CREATE TABLE `marketing_db`.`order` ( `order_key` INT NOT NULL, `total_price` DECIMAL(15, 2), PRIMARY KEY (`order_key`) NOT ENFORCED );
Output:
[INFO] Execute statement succeeded.
The finance user cannot access the marketing database:
-- switch to finance user context USE CATALOG finance_catalog; -- create table using finance user credentials CREATE TABLE `marketing_db`.`order` ( `order_key` INT NOT NULL, `total_price` DECIMAL(15, 2), PRIMARY KEY (`order_key`) NOT ENFORCED );
Output:
[ERROR] Could not execute SQL statement. Reason: org.apache.fluss.exception.AuthorizationException: Principal FlussPrincipal{name='finance', type='User'} have no authorization to operate CREATE on resource Resource{type=DATABASE, name='marketing_db'}