title: “Action Jars” weight: 98 type: docs aliases:
After the Flink Local Cluster has been started, you can execute the action jar by using the following command.
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ <action> <args>
The following command is used to compact a table.
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ compact \ --path <TABLE_PATH>
Paimon supports “MERGE INTO” via submitting the ‘merge_into’ job through flink run.
{{< hint info >}} Important table properties setting:
The design referenced such syntax:
MERGE INTO target-table USING source_table | source-expr AS source-alias ON merge-condition WHEN MATCHED [AND matched-condition] THEN UPDATE SET xxx WHEN MATCHED [AND matched-condition] THEN DELETE WHEN NOT MATCHED [AND not_matched_condition] THEN INSERT VALUES (xxx) WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition] THEN UPDATE SET xxx WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition] THEN DELETE
The merge_into action use “upsert” semantics instead of “update”, which means if the row exists, then do update, else do insert. For example, for non-primary-key table, you can update every column, but for primary key table, if you want to update primary keys, you have to insert a new row which has different primary keys from rows in the table. In this scenario, “upsert” is useful.
Run the following command to submit a ‘merge_into’ job for the table.
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ merge_into \ --warehouse <warehouse-path> \ --database <database-name> \ --table <target-table> \ [--target_as <target-table-alias>] \ --source_table <source_table-name> \ [--source_sql <sql> ...]\ --on <merge-condition> \ --merge_actions <matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete> \ --matched_upsert_condition <matched-condition> \ --matched_upsert_set <upsert-changes> \ --matched_delete_condition <matched-condition> \ --not_matched_insert_condition <not-matched-condition> \ --not_matched_insert_values <insert-values> \ --not_matched_by_source_upsert_condition <not-matched-by-source-condition> \ --not_matched_by_source_upsert_set <not-matched-upsert-changes> \ --not_matched_by_source_delete_condition <not-matched-by-source-condition> \ [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] You can pass sqls by '--source_sql <sql> [, --source_sql <sql> ...]' to config environment and create source table at runtime. -- Examples: -- Find all orders mentioned in the source table, then mark as important if the price is above 100 -- or delete if the price is under 10. ./flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ merge_into \ --warehouse <warehouse-path> \ --database <database-name> \ --table T \ --source_table S \ --on "T.id = S.order_id" \ --merge_actions \ matched-upsert,matched-delete \ --matched_upsert_condition "T.price > 100" \ --matched_upsert_set "mark = 'important'" \ --matched_delete_condition "T.price < 10" -- For matched order rows, increase the price, and if there is no match, insert the order from the -- source table: ./flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ merge_into \ --warehouse <warehouse-path> \ --database <database-name> \ --table T \ --source_table S \ --on "T.id = S.order_id" \ --merge_actions \ matched-upsert,not-matched-insert \ --matched_upsert_set "price = T.price + 20" \ --not_matched_insert_values * -- For not matched by source order rows (which are in the target table and does not match any row in the -- source table based on the merge-condition), decrease the price or if the mark is 'trivial', delete them: ./flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ merge_into \ --warehouse <warehouse-path> \ --database <database-name> \ --table T \ --source_table S \ --on "T.id = S.order_id" \ --merge_actions \ not-matched-by-source-upsert,not-matched-by-source-delete \ --not_matched_by_source_upsert_condition "T.mark <> 'trivial'" \ --not_matched_by_source_upsert_set "price = T.price - 20" \ --not_matched_by_source_delete_condition "T.mark = 'trivial'" -- A --source_sql example: -- Create a temporary view S in new catalog and use it as source table ./flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ merge_into \ --warehouse <warehouse-path> \ --database <database-name> \ --table T \ --source_sql "CREATE CATALOG test_cat WITH (...)" \ --source_sql "CREATE TEMPORARY VIEW test_cat.`default`.S AS SELECT order_id, price, 'important' FROM important_order" \ --source_table test_cat.default.S \ --on "T.id = S.order_id" \ --merge_actions not-matched-insert\ --not_matched_insert_values *
The term ‘matched’ explanation:
Parameters format:
{{< hint warning >}}
{{< /hint >}}
For more information of ‘merge_into’, see
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ merge_into --help
In Flink 1.16 and previous versions, Paimon only supports deleting records via submitting the ‘delete’ job through flink run.
Run the following command to submit a ‘delete’ job for the table.
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ delete \ --warehouse <warehouse-path> \ --database <database-name> \ --table <table-name> \ --where <filter_spec> \ [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] filter_spec is equal to the 'WHERE' clause in SQL DELETE statement. Examples: age >= 18 AND age <= 60 animal <> 'cat' id > (SELECT count(*) FROM employee)
For more information of ‘delete’, see
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ delete --help
Run the following command to submit a ‘drop_partition’ job for the table.
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ drop_partition \ --warehouse <warehouse-path> \ --database <database-name> \ --table <table-name> \ [--partition <partition_spec> [--partition <partition_spec> ...]] \ [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] partition_spec: key1=value1,key2=value2...
For more information of ‘drop_partition’, see
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ drop_partition --help
Run the following command to submit a ‘rewrite_file_index’ job for the table.
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ rewrite_file_index \ --warehouse <warehouse-path> \ --identifier <database.table> \ [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
For more information of ‘rewrite_file_index’, see
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ rewrite_file_index --help