blob: 2e2e0e30edf2816925317a0159b6cd4e8cd4b6fb [file] [log] [blame] [view]
---
title: "SQL Lookup"
weight: 6
type: docs
aliases:
- /flink/sql-lookup.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Lookup Joins
[Lookup Joins](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/) are a type of join in streaming queries. It is used to enrich a table with data that is queried from Paimon. The join requires one table to have a processing time attribute and the other table to be backed by a lookup source connector.
Paimon supports lookup joins on tables with primary keys and append tables in Flink. The following example illustrates this feature.
## Prepare
First, let's create a Paimon table and update it in real-time.
```sql
-- Create a paimon catalog
CREATE CATALOG my_catalog WITH (
'type'='paimon',
'warehouse'='hdfs://nn:8020/warehouse/path' -- or 'file://tmp/foo/bar'
);
USE CATALOG my_catalog;
-- Create a table in paimon catalog
CREATE TABLE customers (
id INT PRIMARY KEY NOT ENFORCED,
name STRING,
country STRING,
zip STRING
);
-- Launch a streaming job to update customers table
INSERT INTO customers ...
-- Create a temporary left table, like from kafka
CREATE TEMPORARY TABLE orders (
order_id INT,
total INT,
customer_id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = '...',
'properties.bootstrap.servers' = '...',
'format' = 'csv'
...
);
```
## Normal Lookup
You can now use `customers` in a lookup join query.
```sql
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
```
## Retry Lookup
If the records of `orders` (main table) join missing because the corresponding data of `customers` (lookup table) is not ready.
You can consider using Flink's [Delayed Retry Strategy For Lookup](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/#3-enable-delayed-retry-strategy-for-lookup).
Only for Flink 1.16+.
```sql
-- enrich each order with customer information
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
```
## Async Retry Lookup
The problem with synchronous retry is that one record will block subsequent records, causing the entire job to be blocked.
You can consider using async + allow_unordered to avoid blocking, the records that join missing will no longer block
other records.
```sql
-- enrich each order with customer information
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
```
{{< hint info >}}
If the main table (`orders`) is CDC stream, `allow_unordered` will be ignored by Flink SQL (only supports append stream),
your streaming job may be blocked. You can try to use `audit_log` system table feature of Paimon to walk around
(convert CDC stream to append stream).
{{< /hint >}}
## Large Scale Lookup (Fixed Bucket)
By default, each Flink subtask would store a whole copy of the lookup table. If the amount of data in `customers`
(lookup table) is too large for a single subtask, you can enable the shuffle lookup optimization as follows
(For Flink 2.0+ and fixed-bucket Paimon table). This optimization enables sending data of the same bucket to designated
subtask(s), so each Flink subtask would only need to store a part of the whole data.
```sql
-- enrich each order with customer information
SELECT /*+ LOOKUP('table'='c', 'shuffle'='true') */
o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
```
Note that this optimization only takes effect when the join keys contain all the bucket keys.
## Dynamic Partition
In traditional data warehouses, each partition often maintains the latest full data, so this partition table only
needs to join the latest partition. Paimon has specifically developed the `max_pt` feature for this scenario.
**Create Paimon Partitioned Table**
```sql
CREATE TABLE customers (
id INT,
name STRING,
country STRING,
zip STRING,
dt STRING,
PRIMARY KEY (id, dt) NOT ENFORCED
) PARTITIONED BY (dt);
```
**Lookup Join**
```sql
SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers /*+ OPTIONS('scan.partitions'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
```
The Lookup node will automatically refresh the latest partition and query the data of the latest partition.
The option `scan.partitions` can also specify fixed partitions in the form of `key1=value1,key2=value2`.
Multiple partitions should be separated by semicolon (`;`).
When specifying fixed partitions, this option can also be used in batch joins.
The option `scan.partitions` can also specify max_pt() for parent partition in the form of `key1=max_pt(),key2=max_pt()`.
All subpartitions for the latest parent partition will be loaded. For example, if partition keys is `'year', 'day', 'hh'`,
you can specify `year=max_pt()`, it will find the latest partition for `year` and load all its subpartitions for lookup.
Only supports partitions to be specified hierarchically. For example, if setting `year=max_pt(),hh=max_pt()`, `hh=max_pt()`
makes no sense.
## Query Service
You can run a Flink Streaming Job to start query service for the table. When QueryService exists, Flink Lookup Join
will prioritize obtaining data from it, which will effectively improve query performance.
{{< tabs "query-service" >}}
{{< tab "Flink SQL" >}}
```sql
CALL sys.query_service('database_name.table_name', parallelism);
```
{{< /tab >}}
{{< tab "Flink Action" >}}
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
query_service \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--parallelism <parallelism>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
{{< /tab >}}
{{< /tabs >}}