blob: da982508193df6b5c44fb5cd143b1d8458fb2ae8 [file] [log] [blame] [view]
---
{
"title": "Batch Deletion",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
Why do we need to introduce import-based Batch Delete when we have the Delete operation?
- **Limitations of Delete operation**
When you delete by Delete statement, each execution of Delete generates an empty rowset to record the deletion conditions and a new version of the data. Each time you read, you have to filter the deletion conditions. If you delete too often or have too many deletion conditions, it will seriously affect the query performance.
- **Insert data interspersed with Delete data**
For scenarios like importing data from a transactional database via CDC, Insert and Delete are usually interspersed in the data. In this case, the current Delete operation cannot be implemented.
When importing data, there are several ways to merge it:
1. APPEND: Append all data to existing data.
2. DELETE: Delete all rows that have the same value as the key column of the imported data (when a `sequence` column exists in the table, it is necessary to satisfy the logic of having the same primary key as well as the size of the sequence column in order to delete it correctly, see Use Case 4 below for details).
3. MERGE: APPEND or DELETE according to DELETE ON decision
:::caution Warning
Batch Delete only works on Unique models.
:::
## Fundamental
This is achieved by adding a hidden column `__DORIS_DELETE_SIGN__` to the Unique table.
When FE parses the query, `__DORIS_DELETE_SIGN__` is removed when it encounters * and so on, and `__DORIS_DELETE_SIGN__ !` `= true`, BE will add a column for judgement when reading, and determine whether to delete by the condition.
- Import
On import, the value of the hidden column is set to the value of the `DELETE ON` expression during the FE parsing stage.
- Read
The read adds `__DORIS_DELETE_SIGN__ !` `= true` condition, BE does not sense this process and executes normally.
- Cumulative Compaction
In Cumulative Compaction, hidden columns are treated as normal columns and the Compaction logic remains unchanged.
- Base Compaction
When Base Compaction is performed, the rows marked for deletion are deleted to reduce the space occupied by the data.
## Syntax Description
The syntax design of the import is mainly to add a column mapping that specifies the field of the delete marker column, and it is necessary to add a column to the imported data. The syntax of various import methods is as follows:
### Stream Load
The writing method of `Stream Load` adds a field to set the delete label column in the columns field in the header. Example: `-H "columns: k1, k2, label_c3" -H "merge_type: [MERGE|APPEND|DELETE]" -H "delete: label_c3=1"`
### Broker Load
The writing method of `Broker Load` sets the field of the delete marker column at `PROPERTIES`. The syntax is as follows:
```sql
LOAD LABEL db1.label1
(
[MERGE|APPEND|DELETE] DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1,tmp_c2, label_c3)
SET
(
id=tmp_c2,
name=tmp_c1,
)
[DELETE ON label_c3=true]
)
WITH BROKER 'broker'
(
"username"="user",
"password"="pass"
)
PROPERTIES
(
"timeout" = "3600"
);
```
### Routine Load
The writing method of `Routine Load` adds a mapping to the `columns` field. The mapping method is the same as above. The syntax is as follows:
```sql
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
[WITH MERGE|APPEND|DELETE]
COLUMNS(k1, k2, k3, v1, v2, label),
WHERE k1 100 and k2 like "%doris%"
[DELETE ON label=true]
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
```
## Note
1. Since import operations other than stream load may be executed out of order inside doris, if it is not stream load when importing using the `MERGE` method, it needs to be used with load sequence. For the specific syntax, please refer to the `sequence` column related documents
2. `DELETE ON` condition can only be used with MERGE.
:::tip Tip
if session variable `SET show_hidden_columns = true` was executed before running import task to show whether table support batch delete feature, then execute `select count(*) from xxx` statement in the same session after finishing `DELETE/MERGE` import task, it will result in a unexpected result that the statement result set will include the deleted results. To avoid this problem, you should execute `SET show_hidden_columns = false` before selecting statement or open a new session to run the select statement.
:::
## Usage Examples
### Check if Batch Delete Support is Enabled
```sql
mysql> CREATE TABLE IF NOT EXISTS table1 (
-> siteid INT,
-> citycode INT,
-> username VARCHAR(64),
-> pv BIGINT
-> ) UNIQUE KEY (siteid, citycode, username)
-> DISTRIBUTED BY HASH(siteid) BUCKETS 10
-> PROPERTIES (
-> "replication_num" = "3"
-> );
Query OK, 0 rows affected (0.34 sec)
mysql> SET show_hidden_columns=true;
Query OK, 0 rows affected (0.00 sec)
mysql> DESC table1;
+-----------------------+-------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------------------+-------------+------+-------+---------+-------+
| siteid | int | Yes | true | NULL | |
| citycode | int | Yes | true | NULL | |
| username | varchar(64) | Yes | true | NULL | |
| pv | bigint | Yes | false | NULL | NONE |
| __DORIS_DELETE_SIGN__ | tinyint | No | false | 0 | NONE |
| __DORIS_VERSION_COL__ | bigint | No | false | 0 | NONE |
+-----------------------+-------------+------+-------+---------+-------+
6 rows in set (0.01 sec)
```
### Stream Load Usage Examples
1. Import data normally:
```shell
curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: APPEND" -T ~/table1_data http://127.0.0.1:8030/api/test/table1/_stream_load
```
The APPEND condition can be omitted, which has the same effect as the following statement:
```shell
curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -T ~/table1_data http://127.0.0.1:8030/api/test/table1/_stream_load
```
2. Delete all data with the same key as the imported data
```Shell
curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: DELETE" -T ~/table1_data http://127.0.0.1:8030/api/test/table1/_stream_load
```
Before load:
```sql
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 3 | 2 | tom | 2 |
| 4 | 3 | bush | 3 |
| 5 | 3 | helen | 3 |
+--------+----------+----------+------+
```
Load data:
```Plain
3,2,tom,0
```
After load:
```sql
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 4 | 3 | bush | 3 |
| 5 | 3 | helen | 3 |
+--------+----------+----------+------+
```
3. Import the same row as the key column of the row with `site_id=1`
```shell
curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: MERGE" -H "delete: siteid=1" -T ~/ table1_data http://127.0.0.1:8030/api/test/table1/_stream_load
```
Before load:
```sql
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 4 | 3 | bush | 3 |
| 5 | 3 | helen | 3 |
| 1 | 1 | jim | 2 |
+--------+----------+----------+------+
```
Load data:
```Plain
2,1,grace,2
3,2,tom,2
1,1,jim,2
```
After load:
```sql
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 4 | 3 | bush | 3 |
| 2 | 1 | grace | 2 |
| 3 | 2 | tom | 2 |
| 5 | 3 | helen | 3 |
+--------+----------+----------+------+
```
4. When the table has the sequence column, delete all data with the same key as the imported data
```shell
curl --location-trusted -u root: -H "column_separator:," -H "columns: name, gender, age" -H "function_column.sequence_col: age" -H "merge_type: DELETE" -T ~/table1_data http://127.0.0.1:8030/api/test/table1/_stream_load
```
When the unique table has the sequence column, sequence column is used as the basis for the replacement order of the REPLACE aggregate function under the same key column, and the larger value can replace the smaller value. If you want delete some data, the imported data must have the same key and the sequence column must be larger or equal than before.
For example, one table like this:
```sql
mysql> SET show_hidden_columns=true;
Query OK, 0 rows affected (0.00 sec)
mysql> DESC table1;
+------------------------+--------------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+------------------------+--------------+------+-------+---------+---------+
| name | VARCHAR(100) | No | true | NULL | |
| gender | VARCHAR(10) | Yes | false | NULL | REPLACE |
| age | INT | Yes | false | NULL | REPLACE |
| __DORIS_DELETE_SIGN__ | TINYINT | No | false | 0 | REPLACE |
| __DORIS_SEQUENCE_COL__ | INT | Yes | false | NULL | REPLACE |
+------------------------+--------------+------+-------+---------+---------+
4 rows in set (0.00 sec)
```
Before load:
```sql
+-------+--------+------+
| name | gender | age |
+-------+--------+------+
| li | male | 10 |
| wang | male | 14 |
| zhang | male | 12 |
+-------+--------+------+
```
If you load data like this:
```Plain
li,male,10
```
After load:
```sql
+-------+--------+------+
| name | gender | age |
+-------+--------+------+
| wang | male | 14 |
| zhang | male | 12 |
+-------+--------+------+
```
You will find that the data is deleted.
```Plain
li,male,10
```
But if you load data like this:
```Plain
li,male,9
```
After load:
```sql
+-------+--------+------+
| name | gender | age |
+-------+--------+------+
| li | male | 10 |
| wang | male | 14 |
| zhang | male | 12 |
+-------+--------+------+
```
You will find that the data is not deleted.
```Plain
li,male,10
```
This is because in the underlying dependencies, it will first judge the case of the same key, display the row data with a large value in the sequence column, and then check whether the `__DORIS_DELETE_SIGN__` value of the row is 1. If it is 1, it will not be displayed. If it is 0, it will still be read out.
:::tip Tip
When data is written and deleted at the same time in the imported data (e.g., in the Flink CDC scenario), using the sequence column can effectively ensure consistency when the data arrives out of order, avoiding the deletion operation of an old version that arrives later, and accidentally deleting the new version of the data that arrives first.
:::