Flink lookup joins are important because they enable efficient, real-time enrichment of streaming data with reference data, a common requirement in many real-time analytics and processing scenarios.
'lookup.async' = 'false'.USE CATALOG fluss_catalog;
CREATE DATABASE my_db;
USE my_db;
CREATE TABLE `fluss_catalog`.`my_db`.`orders` ( `o_orderkey` INT NOT NULL, `o_custkey` INT NOT NULL, `o_orderstatus` CHAR(1) NOT NULL, `o_totalprice` DECIMAL(15, 2) NOT NULL, `o_orderdate` DATE NOT NULL, `o_orderpriority` CHAR(15) NOT NULL, `o_clerk` CHAR(15) NOT NULL, `o_shippriority` INT NOT NULL, `o_comment` STRING NOT NULL, `o_dt` STRING NOT NULL, PRIMARY KEY (o_orderkey) NOT ENFORCED );
CREATE TABLE `fluss_catalog`.`my_db`.`customer` ( `c_custkey` INT NOT NULL, `c_name` STRING NOT NULL, `c_address` STRING NOT NULL, `c_nationkey` INT NOT NULL, `c_phone` CHAR(15) NOT NULL, `c_acctbal` DECIMAL(15, 2) NOT NULL, `c_mktsegment` CHAR(10) NOT NULL, `c_comment` STRING NOT NULL, PRIMARY KEY (c_custkey) NOT ENFORCED );
CREATE TEMPORARY TABLE lookup_join_sink ( order_key INT NOT NULL, order_totalprice DECIMAL(15, 2) NOT NULL, customer_name STRING NOT NULL, customer_address STRING NOT NULL ) WITH ('connector' = 'blackhole');
-- look up join in asynchronous mode. INSERT INTO lookup_join_sink SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address` FROM (SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o` LEFT JOIN `customer` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON `o`.`o_custkey` = `c`.`c_custkey`;
-- look up join in synchronous mode. INSERT INTO lookup_join_sink SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address` FROM (SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o` LEFT JOIN `customer` /*+ OPTIONS('lookup.async' = 'false') */ FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON `o`.`o_custkey` = `c`.`c_custkey`;
Continuing from the previous example, if our dimension table is a Fluss partitioned primary key table, as follows:
CREATE TABLE `fluss_catalog`.`my_db`.`customer_partitioned` ( `c_custkey` INT NOT NULL, `c_name` STRING NOT NULL, `c_address` STRING NOT NULL, `c_nationkey` INT NOT NULL, `c_phone` CHAR(15) NOT NULL, `c_acctbal` DECIMAL(15, 2) NOT NULL, `c_mktsegment` CHAR(10) NOT NULL, `c_comment` STRING NOT NULL, `dt` STRING NOT NULL, PRIMARY KEY (`c_custkey`, `dt`) NOT ENFORCED ) PARTITIONED BY (`dt`) WITH ( 'table.auto-partition.enabled' = 'true', 'table.auto-partition.time-unit' = 'year' );
To do a lookup join with the Fluss partitioned primary key table, we need to specify the primary keys (including partition key) in the join condition.
INSERT INTO lookup_join_sink SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address` FROM (SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o` LEFT JOIN `customer_partitioned` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON `o`.`o_custkey` = `c`.`c_custkey` AND `o`.`o_dt` = `c`.`dt`;
For more details about Fluss partitioned table, see Partitioned Tables.
'lookup.async' = 'false'.USE CATALOG fluss_catalog;
CREATE DATABASE my_db;
USE my_db;
CREATE TABLE `fluss_catalog`.`my_db`.`orders_with_dt` ( `o_orderkey` INT NOT NULL, `o_custkey` INT NOT NULL, `o_orderstatus` CHAR(1) NOT NULL, `o_totalprice` DECIMAL(15, 2) NOT NULL, `o_orderdate` DATE NOT NULL, `o_orderpriority` CHAR(15) NOT NULL, `o_clerk` CHAR(15) NOT NULL, `o_shippriority` INT NOT NULL, `o_comment` STRING NOT NULL, `o_dt` STRING NOT NULL, PRIMARY KEY (o_orderkey) NOT ENFORCED );
-- primary keys are (c_custkey, c_nationkey) -- bucket key is (c_custkey) CREATE TABLE `fluss_catalog`.`my_db`.`customer_with_bucket_key` ( `c_custkey` INT NOT NULL, `c_name` STRING NOT NULL, `c_address` STRING NOT NULL, `c_nationkey` INT NOT NULL, `c_phone` CHAR(15) NOT NULL, `c_acctbal` DECIMAL(15, 2) NOT NULL, `c_mktsegment` CHAR(10) NOT NULL, `c_comment` STRING NOT NULL, PRIMARY KEY (`c_custkey`, `c_nationkey`) NOT ENFORCED ) WITH ( 'bucket.key' = 'c_custkey' );
CREATE TEMPORARY TABLE prefix_lookup_join_sink ( order_key INT NOT NULL, order_totalprice DECIMAL(15, 2) NOT NULL, customer_name STRING NOT NULL, customer_address STRING NOT NULL ) WITH ('connector' = 'blackhole');
-- prefix look up join in asynchronous mode. INSERT INTO prefix_lookup_join_sink SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address` FROM (SELECT `orders_with_dt`.*, proctime() AS ptime FROM `orders_with_dt`) AS `o` LEFT JOIN `customer_with_bucket_key` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON `o`.`o_custkey` = `c`.`c_custkey`; -- join key is a prefix set of dimension table primary keys.
-- prefix look up join in synchronous mode. INSERT INTO prefix_lookup_join_sink SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address` FROM (SELECT `orders_with_dt`.*, proctime() AS ptime FROM `orders_with_dt`) AS `o` LEFT JOIN `customer_with_bucket_key` /*+ OPTIONS('lookup.async' = 'false') */ FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON `o`.`o_custkey` = `c`.`c_custkey`;
Continuing from the previous prefix lookup example, if our dimension table is a Fluss partitioned primary key table, as follows:
-- primary keys are (c_custkey, c_nationkey, dt) -- bucket key is (c_custkey) CREATE TABLE `fluss_catalog`.`my_db`.`customer_partitioned_with_bucket_key` ( `c_custkey` INT NOT NULL, `c_name` STRING NOT NULL, `c_address` STRING NOT NULL, `c_nationkey` INT NOT NULL, `c_phone` CHAR(15) NOT NULL, `c_acctbal` DECIMAL(15, 2) NOT NULL, `c_mktsegment` CHAR(10) NOT NULL, `c_comment` STRING NOT NULL, `dt` STRING NOT NULL, PRIMARY KEY (`c_custkey`, `c_nationkey`, `dt`) NOT ENFORCED ) PARTITIONED BY (`dt`) WITH ( 'bucket.key' = 'c_custkey', 'table.auto-partition.enabled' = 'true', 'table.auto-partition.time-unit' = 'year' );
To do a prefix lookup with the Fluss partitioned primary key table, the prefix lookup join key is in pattern of a prefix subset of primary keys (excluding partition key) + partition key.
INSERT INTO prefix_lookup_join_sink SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address` FROM (SELECT `orders_with_dt`.*, proctime() AS ptime FROM `orders_with_dt`) AS `o` LEFT JOIN `customer_partitioned_with_bucket_key` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON `o`.`o_custkey` = `c`.`c_custkey` AND `o`.`o_dt` = `c`.`dt`; -- join key is a prefix set of dimension table primary keys (excluding partition key) + partition key.
For more details about Fluss partitioned table, see Partitioned Tables.
Fluss lookup join supports various configuration options. For more details, please refer to the Connector Options page.