blob: f1597b93474ed861832d061637ebccfcab2c49a5 [file] [log] [blame] [view]
---
title: "SQL Query"
weight: 4
type: docs
aliases:
- /spark/sql-query.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# SQL Query
Just like all other tables, Paimon tables can be queried with `SELECT` statement.
## Batch Query
Paimon's batch read returns all the data in a snapshot of the table. By default, batch reads return the latest snapshot.
```sql
-- read all columns
SELECT * FROM t;
```
Paimon also supports reading some hidden metadata columns, currently supporting the following columns:
- `__paimon_file_path`: the file path of the record.
- `__paimon_partition`: the partition of the record.
- `__paimon_bucket`: the bucket of the record.
- `__paimon_row_index`: the row index of the record.
```sql
-- read all columns and the corresponding file path, partition, bucket, rowIndex of the record
SELECT *, __paimon_file_path, __paimon_partition, __paimon_bucket, __paimon_row_index FROM t;
```
### Batch Time Travel
Paimon batch reads with time travel can specify a snapshot or a tag and read the corresponding data.
Requires Spark 3.3+.
you can use `VERSION AS OF` and `TIMESTAMP AS OF` in query to do time travel:
```sql
-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t VERSION AS OF 1;
-- read the snapshot from specified timestamp
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';
-- read the snapshot from specified timestamp in unix seconds
SELECT * FROM t TIMESTAMP AS OF 1678883047;
-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';
-- read the snapshot from specified watermark. will match the first snapshot after the watermark
SELECT * FROM t VERSION AS OF 'watermark-1678883047356';
```
{{< hint warning >}}
If tag's name is a number and equals to a snapshot id, the VERSION AS OF syntax will consider tag first. For example, if
you have a tag named '1' based on snapshot 2, the statement `SELECT * FROM t VERSION AS OF '1'` actually queries snapshot 2
instead of snapshot 1.
{{< /hint >}}
### Batch Incremental
Read incremental changes between start snapshot (exclusive) and end snapshot.
For example:
- '5,10' means changes between snapshot 5 and snapshot 10.
- 'TAG1,TAG3' means changes between TAG1 and TAG3.
By default, will scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files.
You can also force specifying `'incremental-between-scan-mode'`.
Paimon supports that use Spark SQL to do the incremental query that implemented by Spark Table Valued Function.
```sql
-- read the incremental data between snapshot id 12 and snapshot id 20.
SELECT * FROM paimon_incremental_query('tableName', 12, 20);
-- read the incremental data between ts 1692169900000 and ts 1692169900000.
SELECT * FROM paimon_incremental_between_timestamp('tableName', '1692169000000', '1692169900000');
SELECT * FROM paimon_incremental_between_timestamp('tableName', '2025-03-12 00:00:00', '2025-03-12 00:08:00');
-- read the incremental data to tag '2024-12-04'.
-- Paimon will find an earlier tag and return changes between them.
-- If the tag doesn't exist or the earlier tag doesn't exist, return empty.
SELECT * FROM paimon_incremental_to_auto_tag('tableName', '2024-12-04');
```
In Batch SQL, the `DELETE` records are not allowed to be returned, so records of `-D` will be dropped.
If you want see `DELETE` records, you can query audit_log table.
## Streaming Query
{{< hint info >}}
Paimon currently supports Spark 3.3+ for streaming read.
{{< /hint >}}
Paimon supports rich scan mode for streaming read. There is a list:
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Scan Mode</th>
<th class="text-left" style="width: 60%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>latest</h5></td>
<td>For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. </td>
</tr>
<tr>
<td><h5>latest-full</h5></td>
<td>For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes.</td>
</tr>
<tr>
<td><h5>from-timestamp</h5></td>
<td>For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. </td>
</tr>
<tr>
<td><h5>from-snapshot</h5></td>
<td>For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. </td>
</tr>
<tr>
<td><h5>from-snapshot-full</h5></td>
<td>For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes.</td>
</tr>
<tr>
<td><h5>default</h5></td>
<td>It is equivalent to from-snapshot if "scan.snapshot-id" is specified. It is equivalent to from-timestamp if "timestamp-millis" is specified. Or, It is equivalent to latest-full.</td>
</tr>
</tbody>
</table>
A simple example with default scan mode:
```scala
// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
.format("paimon")
.load("/path/to/paimon/source/table")
.writeStream
.format("console")
.start()
```
Paimon Structured Streaming also supports a variety of streaming read modes, it can support many triggers and many read limits.
These read limits are supported:
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>read.stream.maxFilesPerTrigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The maximum number of files returned in a single batch.</td>
</tr>
<tr>
<td><h5>read.stream.maxBytesPerTrigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>The maximum number of bytes returned in a single batch.</td>
</tr>
<tr>
<td><h5>read.stream.maxRowsPerTrigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>The maximum number of rows returned in a single batch.</td>
</tr>
<tr>
<td><h5>read.stream.minRowsPerTrigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.</td>
</tr>
<tr>
<td><h5>read.stream.maxTriggerDelayMs</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together.</td>
</tr>
</tbody>
</table>
**Example: One**
Use `org.apache.spark.sql.streaming.Trigger.AvailableNow()` and `maxBytesPerTrigger` defined by paimon.
```scala
// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
.format("paimon")
.option("read.stream.maxBytesPerTrigger", "134217728")
.load("/path/to/paimon/source/table")
.writeStream
.format("console")
.trigger(Trigger.AvailableNow())
.start()
```
**Example: Two**
Use `org.apache.spark.sql.connector.read.streaming.ReadMinRows`.
```scala
// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
.format("paimon")
.option("read.stream.minRowsPerTrigger", "5000")
.option("read.stream.maxTriggerDelayMs", "300000")
.load("/path/to/paimon/source/table")
.writeStream
.format("console")
.start()
```
Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its
change type) in two ways:
- Direct streaming read with the system audit_log table
- Set `read.changelog` to true (default is false), then streaming read with table location
**Example:**
```scala
// Option 1
val query1 = spark.readStream
.format("paimon")
.table("`table_name$audit_log`")
.writeStream
.format("console")
.start()
// Option 2
val query2 = spark.readStream
.format("paimon")
.option("read.changelog", "true")
.load("/path/to/paimon/source/table")
.writeStream
.format("console")
.start()
/*
+I 1 Hi
+I 2 Hello
*/
```
## Query Optimization
It is highly recommended to specify partition and primary key filters
along with the query, which will speed up the data skipping of the query.
The filter functions that can accelerate data skipping are:
- `=`
- `<`
- `<=`
- `>`
- `>=`
- `IN (...)`
- `LIKE 'abc%'`
- `IS NULL`
Paimon will sort the data by primary key, which speeds up the point queries
and range queries. When using a composite primary key, it is best for the query
filters to form a [leftmost prefix](https://dev.mysql.com/doc/refman/5.7/en/multiple-column-indexes.html)
of the primary key for good acceleration.
Suppose that a table has the following specification:
```sql
CREATE TABLE orders (
catalog_id BIGINT,
order_id BIGINT,
.....,
) TBLPROPERTIES (
'primary-key' = 'catalog_id,order_id'
);
```
The query obtains a good acceleration by specifying a range filter for
the leftmost prefix of the primary key.
```sql
SELECT * FROM orders WHERE catalog_id=1025;
SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;
SELECT * FROM orders
WHERE catalog_id=1025
AND order_id>2035 AND order_id<6000;
```
However, the following filter cannot accelerate the query well.
```sql
SELECT * FROM orders WHERE order_id=29495;
SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;
```