title: “SQL Write” weight: 2 type: docs aliases:
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };
For more information, please check the syntax document:
Use INSERT INTO to apply records and changes to tables.
INSERT INTO my_table SELECT ...
INSERT INTO supports both batch and streaming mode. In Streaming mode, by default, it will also perform compaction, snapshot expiration, and even partition expiration in Flink Sink (if it is configured).
For multiple jobs to write the same table, you can refer to [dedicated compaction job]({{< ref “maintenance/dedicated-compaction#dedicated-compaction-job” >}}) for more info.
In Paimon, clustering is a feature that allows you to cluster data in your [Append Table]({{< ref “append-table/overview” >}}) based on the values of certain columns during the write process. This organization of data can significantly enhance the efficiency of downstream tasks when reading the data, as it enables faster and more targeted data retrieval. This feature is only supported for [Append Table]({{< ref “append-table/overview” >}})(bucket = -1) and batch execution mode.
To utilize clustering, you can specify the columns you want to cluster when creating or writing to a table. Here's a simple example of how to enable clustering:
CREATE TABLE my_table ( a STRING, b STRING, c STRING, ) WITH ( 'sink.clustering.by-columns' = 'a,b', );
You can also use SQL hints to dynamically set clustering options:
INSERT INTO my_table /*+ OPTIONS('sink.clustering.by-columns' = 'a,b') */ SELECT * FROM source;
The data is clustered using an automatically chosen strategy (such as ORDER, ZORDER, or HILBERT), but you can manually specify the clustering strategy by setting the sink.clustering.strategy. Clustering relies on sampling and sorting. If the clustering process takes too much time, you can decrease the total sample number by setting the sink.clustering.sample-factor or disable the sorting step by setting the sink.clustering.sort-in-cluster to false.
You can refer to [FlinkConnectorOptions]({{< ref “maintenance/configurations#flinkconnectoroptions” >}}) for more info about the configurations above.
For unpartitioned tables, Paimon supports overwriting the whole table. (or for partitioned table which disables dynamic-partition-overwrite option).
Use INSERT OVERWRITE to overwrite the whole unpartitioned table.
INSERT OVERWRITE my_table SELECT ...
For partitioned tables, Paimon supports overwriting a partition.
Use INSERT OVERWRITE to overwrite a partition.
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
Flink's default overwrite mode is dynamic partition overwrite (that means Paimon only deletes the partitions appear in the overwritten data). You can configure dynamic-partition-overwrite to change it to static overwritten.
-- MyTable is a Partitioned Table -- Dynamic overwrite INSERT OVERWRITE my_table SELECT ... -- Static overwrite (Overwrite whole table) INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...
{{< tabs “truncate-tables-syntax” >}}
{{< tab “Flink 1.17-” >}}
You can use INSERT OVERWRITE to purge tables by inserting empty value.
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM my_table WHERE false;
{{< /tab >}}
{{< tab “Flink 1.18+” >}}
TRUNCATE TABLE my_table;
{{< /tab >}}
{{< /tabs >}}
Currently, Paimon supports two ways to purge partitions.
Like purging tables, you can use INSERT OVERWRITE to purge data of partitions by inserting empty value to them.
Method #1 does not support to drop multiple partitions. In case that you need to drop multiple partitions, you can submit the drop_partition job through flink run.
-- Syntax INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM my_table WHERE false; -- The following SQL is an example: -- table definition CREATE TABLE my_table ( k0 INT, k1 INT, v STRING ) PARTITIONED BY (k0, k1); -- you can use INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ PARTITION (k0 = 0) SELECT k1, v FROM my_table WHERE false; -- or INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ PARTITION (k0 = 0, k1 = 0) SELECT v FROM my_table WHERE false;
{{< hint info >}} Important table properties setting:
Currently, Paimon supports updating records by using UPDATE in Flink 1.17 and later versions. You can perform UPDATE in Flink's batch mode.
-- Syntax UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition; -- The following SQL is an example: -- table definition CREATE TABLE my_table ( a STRING, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'merge-engine' = 'deduplicate' ); -- you can use UPDATE my_table SET b = 1, c = 2 WHERE a = 'myTable';
{{< tabs “delete-from-table” >}}
{{< tab “Flink 1.17+” >}}
{{< hint info >}} Important table properties setting:
-- Syntax DELETE FROM table_identifier WHERE conditions; -- The following SQL is an example: -- table definition CREATE TABLE my_table ( id BIGINT NOT NULL, currency STRING, rate BIGINT, dt String, PRIMARY KEY (id, dt) NOT ENFORCED ) PARTITIONED BY (dt) WITH ( 'merge-engine' = 'deduplicate' ); -- you can use DELETE FROM my_table WHERE currency = 'UNKNOWN';
{{< /tab >}}
{{< /tabs >}}
For partitioned tables, each partition may need to be scheduled to trigger downstream batch computation. Therefore, it is necessary to choose this timing to indicate that it is ready for scheduling and to minimize the amount of data drift during scheduling. We call this process: “Partition Mark Done”.
Example to mark done:
CREATE TABLE my_partitioned_table ( f0 INT, f1 INT, f2 INT, ... dt STRING ) PARTITIONED BY (dt) WITH ( 'partition.timestamp-formatter'='yyyyMMdd', 'partition.timestamp-pattern'='$dt', 'partition.time-interval'='1 d', 'partition.idle-time-to-done'='15 m', 'partition.mark-done-action'='done-partition' );
You can also customize a PartitionMarkDoneAction to mark the partition completed.
Define a class CustomPartitionMarkDoneAction to implement the PartitionMarkDoneAction interface.
package org.apache.paimon; public class CustomPartitionMarkDoneAction implements PartitionMarkDoneAction { @Override public void markDone(String partition) { // do something. } @Override public void close() {} }
Paimon also support http-report partition mark done action, this action will report the partition to the remote http server.
Http Post request body :
{ "table": "table fullName", "path": "table location path", "partition": "mark done partition", "params" : "custom params" }
Http Response body :
{ "result": "success" }
creationTime and modificationTime, they can help you understand if there is any delayed data. You can also configure other actions, like 'done-partition', for example, partition 'dt=20240501' with produce 'dt=20240501.done' done partition.