blob: cf468e76992a0d1479dda62575804a4c78f9e812 [file] [log] [blame] [view]
---
title: "SQL Write"
weight: 2
type: docs
aliases:
- /flink/sql-write.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# SQL Write
## Syntax
```sql
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };
```
For more information, please check the syntax document:
[Flink INSERT Statement](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/insert/)
## INSERT INTO
Use `INSERT INTO` to apply records and changes to tables.
```sql
INSERT INTO my_table SELECT ...
```
`INSERT INTO` supports both batch and streaming mode. In Streaming mode, by default, it will also perform compaction,
snapshot expiration, and even partition expiration in Flink Sink (if it is configured).
For multiple jobs to write the same table, you can refer to [dedicated compaction job]({{< ref "maintenance/dedicated-compaction#dedicated-compaction-job" >}}) for more info.
### Clustering
In Paimon, clustering is a feature that allows you to cluster data in your [Append Table]({{< ref "append-table/overview" >}})
based on the values of certain columns during the write process. This organization of data can significantly enhance the efficiency of downstream
tasks when reading the data, as it enables faster and more targeted data retrieval. This feature is only supported for [Append Table]({{< ref "append-table/overview" >}})(bucket = -1)
and batch execution mode.
To utilize clustering, you can specify the columns you want to cluster when creating or writing to a table. Here's a simple example of how to enable clustering:
```sql
CREATE TABLE my_table (
a STRING,
b STRING,
c STRING,
) WITH (
'sink.clustering.by-columns' = 'a,b',
);
```
You can also use SQL hints to dynamically set clustering options:
```sql
INSERT INTO my_table /*+ OPTIONS('sink.clustering.by-columns' = 'a,b') */
SELECT * FROM source;
```
The data is clustered using an automatically chosen strategy (such as ORDER, ZORDER, or HILBERT), but you can manually specify the clustering strategy
by setting the `sink.clustering.strategy`. Clustering relies on sampling and sorting. If the clustering process takes too much time, you can decrease
the total sample number by setting the `sink.clustering.sample-factor` or disable the sorting step by setting the `sink.clustering.sort-in-cluster` to false.
You can refer to [FlinkConnectorOptions]({{< ref "maintenance/configurations#flinkconnectoroptions" >}}) for more info about the configurations above.
## Overwriting the Whole Table
For unpartitioned tables, Paimon supports overwriting the whole table.
(or for partitioned table which disables `dynamic-partition-overwrite` option).
Use `INSERT OVERWRITE` to overwrite the whole unpartitioned table.
```sql
INSERT OVERWRITE my_table SELECT ...
```
### Overwriting a Partition
For partitioned tables, Paimon supports overwriting a partition.
Use `INSERT OVERWRITE` to overwrite a partition.
```sql
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
```
### Dynamic Overwrite
Flink's default overwrite mode is dynamic partition overwrite (that means Paimon only deletes the partitions
appear in the overwritten data). You can configure `dynamic-partition-overwrite` to change it to static overwritten.
```sql
-- MyTable is a Partitioned Table
-- Dynamic overwrite
INSERT OVERWRITE my_table SELECT ...
-- Static overwrite (Overwrite whole table)
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...
```
## Truncate tables
{{< tabs "truncate-tables-syntax" >}}
{{< tab "Flink 1.17-" >}}
You can use `INSERT OVERWRITE` to purge tables by inserting empty value.
```sql
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM my_table WHERE false;
```
{{< /tab >}}
{{< tab "Flink 1.18+" >}}
```sql
TRUNCATE TABLE my_table;
```
{{< /tab >}}
{{< /tabs >}}
## Purging Partitions
Currently, Paimon supports two ways to purge partitions.
1. Like purging tables, you can use `INSERT OVERWRITE` to purge data of partitions by inserting empty value to them.
2. Method #1 does not support to drop multiple partitions. In case that you need to drop multiple partitions, you can submit the drop_partition job through `flink run`.
```sql
-- Syntax
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */
PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM my_table WHERE false;
-- The following SQL is an example:
-- table definition
CREATE TABLE my_table (
k0 INT,
k1 INT,
v STRING
) PARTITIONED BY (k0, k1);
-- you can use
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */
PARTITION (k0 = 0) SELECT k1, v FROM my_table WHERE false;
-- or
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */
PARTITION (k0 = 0, k1 = 0) SELECT v FROM my_table WHERE false;
```
## Updating tables
{{< hint info >}}
Important table properties setting:
1. Only [primary key table]({{< ref "primary-key-table/overview" >}}) supports this feature.
2. [MergeEngine]({{< ref "primary-key-table/merge-engine" >}}) needs to be [deduplicate]({{< ref "primary-key-table/merge-engine/overview#deduplicate" >}})
or [partial-update]({{< ref "primary-key-table/merge-engine/partial-update" >}}) to support this feature.
3. Do not support updating primary keys.
{{< /hint >}}
Currently, Paimon supports updating records by using `UPDATE` in Flink 1.17 and later versions. You can perform `UPDATE` in Flink's `batch` mode.
```sql
-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;
-- The following SQL is an example:
-- table definition
CREATE TABLE my_table (
a STRING,
b INT,
c INT,
PRIMARY KEY (a) NOT ENFORCED
) WITH (
'merge-engine' = 'deduplicate'
);
-- you can use
UPDATE my_table SET b = 1, c = 2 WHERE a = 'myTable';
```
## Deleting from table
{{< tabs "delete-from-table" >}}
{{< tab "Flink 1.17+" >}}
{{< hint info >}}
Important table properties setting:
1. Only primary key tables support this feature.
2. If the table has primary keys, the following [MergeEngine]({{< ref "primary-key-table/merge-engine/overview" >}}) support this feature:
* [deduplicate]({{< ref "primary-key-table/merge-engine/overview#deduplicate" >}}).
* [partial-update]({{< ref "primary-key-table/merge-engine/partial-update" >}}) with option 'partial-update.remove-record-on-delete' enabled.
3. Do not support deleting from table in streaming mode.
{{< /hint >}}
```sql
-- Syntax
DELETE FROM table_identifier WHERE conditions;
-- The following SQL is an example:
-- table definition
CREATE TABLE my_table (
id BIGINT NOT NULL,
currency STRING,
rate BIGINT,
dt String,
PRIMARY KEY (id, dt) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'merge-engine' = 'deduplicate'
);
-- you can use
DELETE FROM my_table WHERE currency = 'UNKNOWN';
```
{{< /tab >}}
{{< /tabs >}}
## Partition Mark Done
For partitioned tables, each partition may need to be scheduled to trigger downstream batch computation. Therefore,
it is necessary to choose this timing to indicate that it is ready for scheduling and to minimize the amount of data
drift during scheduling. We call this process: "Partition Mark Done".
Example to mark done:
```sql
CREATE TABLE my_partitioned_table (
f0 INT,
f1 INT,
f2 INT,
...
dt STRING
) PARTITIONED BY (dt) WITH (
'partition.timestamp-formatter'='yyyyMMdd',
'partition.timestamp-pattern'='$dt',
'partition.time-interval'='1 d',
'partition.idle-time-to-done'='15 m',
'partition.mark-done-action'='done-partition'
);
```
You can also customize a PartitionMarkDoneAction to mark the partition completed.
- partition.mark-done-action: custom
- partition.mark-done-action.custom.class: The partition mark done class for implement PartitionMarkDoneAction interface (e.g. org.apache.paimon.CustomPartitionMarkDoneAction).
Define a class CustomPartitionMarkDoneAction to implement the PartitionMarkDoneAction interface.
```java
package org.apache.paimon;
public class CustomPartitionMarkDoneAction implements PartitionMarkDoneAction {
@Override
public void markDone(String partition) {
// do something.
}
@Override
public void close() {}
}
```
Paimon also support http-report partition mark done action, this action will report the partition to the remote http server.
- partition.mark-done-action: http-report
- partition.mark-done-action.http.url : Action will report the partition to the remote http server.
- partition.mark-done-action.http.params : Http client request params in the request body json.
Http Post request body :
```json
{
"table": "table fullName",
"path": "table location path",
"partition": "mark done partition",
"params" : "custom params"
}
```
Http Response body :
```json
{
"result": "success"
}
```
1. Firstly, you need to define the time parser of the partition and the time interval between partitions in order to
determine when the partition can be properly marked done.
2. Secondly, you need to define idle-time, which determines how long it takes for the partition to have no new data,
and then it will be marked as done.
3. Thirdly, by default, partition mark done will create _SUCCESS file, the content of _SUCCESS file is a json, contains
`creationTime` and `modificationTime`, they can help you understand if there is any delayed data. You can also
configure other actions, like `'done-partition'`, for example, partition `'dt=20240501'` with produce
`'dt=20240501.done'` done partition.