Doris provides powerful data transformation capabilities during import, which can simplify part of the data processing pipeline and reduce reliance on additional ETL tools. With these built-in transformation features, you can improve import efficiency and ensure consistent data processing logic.
Doris supports the following four data transformation methods during import:
| Transformation Method | Purpose | Execution Timing |
|---|---|---|
| Column mapping | Maps source data columns to different columns of the target table | After data parsing |
| Column transformation | Performs real-time transformation on source data using functions and expressions | After column mapping |
| Pre-filtering | Filters out unneeded raw data before column mapping and column transformation | After data parsing, before column mapping |
| Post-filtering | Filters the final result after column mapping and column transformation | After column transformation |
The support of each import method for the four transformation capabilities is as follows:
| Import Method | Column Mapping | Column Transformation | Pre-filtering | Post-filtering |
|---|---|---|---|---|
| Stream Load | Supported | Supported | Not supported | Supported |
| Broker Load | Supported | Supported | Supported | Supported |
| Routine Load | Supported | Supported | Supported | Supported |
| Insert Into | Implemented via SELECT | Implemented via SELECT | Implemented via WHERE | Implemented via WHERE |
Different import methods use different parameters or clauses to declare data transformation logic. The following table summarizes the correspondence.
Data transformation is implemented by setting the following parameters in the HTTP header:
| Parameter | Description |
|---|---|
columns | Specifies column mapping and column transformation |
where | Specifies post-filtering |
Note: Stream Load does not support pre-filtering.
Example:
curl --location-trusted -u user:passwd \ -H "columns: k1, k2, tmp_k3, k3 = tmp_k3 + 1" \ -H "where: k1 > 1" \ -T data.csv \ http://<fe_ip>:<fe_http_port>/api/example_db/example_table/_stream_load
Data transformation is implemented in the SQL statement through the following clauses:
| Clause | Description |
|---|---|
column list | Specifies column mapping, in the format (k1, k2, tmp_k3) |
SET | Specifies column transformation |
PRECEDING FILTER | Specifies pre-filtering |
WHERE | Specifies post-filtering |
Example:
LOAD LABEL test_db.label1 ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE `test_tbl` (k1, k2, tmp_k3) PRECEDING FILTER k1 = 1 SET ( k3 = tmp_k3 + 1 ) WHERE k1 > 1 ) WITH S3 (...);
Data transformation is implemented in the SQL statement through the following clauses:
| Clause | Description |
|---|---|
COLUMNS | Specifies column mapping and column transformation |
PRECEDING FILTER | Specifies pre-filtering |
WHERE | Specifies post-filtering |
Example:
CREATE ROUTINE LOAD test_db.label1 ON test_tbl COLUMNS(k1, k2, tmp_k3, k3 = tmp_k3 + 1), PRECEDING FILTER k1 = 1, WHERE k1 > 1 ...
Insert Into can perform data transformation directly in the SELECT statement, and uses the WHERE clause for data filtering.
Column mapping defines the correspondence between source data columns and target table columns. It can handle the following scenarios:
The implementation of column mapping can be divided into two core steps:
The following are the processing flows for three different data formats:
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
{"k1":1,"k2":"100","k3":"beijing","k4":1.1} {"k1":2,"k2":"200","k3":"shanghai","k4":1.2} {"k1":3,"k2":"300","k3":"guangzhou","k4":1.3} {"k1":4,"k2":"\\N","k3":"chongqing","k4":1.4}
CREATE TABLE example_table ( col1 INT, col2 STRING, col3 INT, col4 DOUBLE ) ENGINE = OLAP DUPLICATE KEY(col1) DISTRIBUTED BY HASH(col1) BUCKETS 1;
curl --location-trusted -u user:passwd \ -H "columns:col1, col3, col2, col4" \ -H "jsonpaths:[\"$.k1\", \"$.k2\", \"$.k3\", \"$.k4\"]" \ -H "format:json" \ -H "read_json_by_line:true" \ -T data.json \ -X PUT \ http://<fe_ip>:<fe_http_port>/api/example_db/example_table/_stream_load
LOAD LABEL example_db.label_broker ( DATA INFILE("s3://bucket_name/data.json") INTO TABLE example_table FORMAT AS "json" (col1, col3, col2, col4) PROPERTIES ( "jsonpaths" = "[\"$.k1\", \"$.k2\", \"$.k3\", \"$.k4\"]" ) ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(col1, col3, col2, col4) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.k1\", \"$.k2\", \"$.k3\", \"$.k4\"]", "read_json_by_line" = "true" ) FROM KAFKA (...);
mysql> SELECT * FROM example_table; +------+-----------+------+------+ | col1 | col2 | col3 | col4 | +------+-----------+------+------+ | 1 | beijing | 100 | 1.1 | | 2 | shanghai | 200 | 1.2 | | 3 | guangzhou | 300 | 1.3 | | 4 | chongqing | NULL | 1.4 | +------+-----------+------+------+
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
{"k1":1,"k2":"100","k3":"beijing","k4":1.1} {"k1":2,"k2":"200","k3":"shanghai","k4":1.2} {"k1":3,"k2":"300","k3":"guangzhou","k4":1.3} {"k1":4,"k2":"\\N","k3":"chongqing","k4":1.4}
CREATE TABLE example_table ( col1 INT, col2 STRING, col3 INT, col4 DOUBLE ) ENGINE = OLAP DUPLICATE KEY(col1) DISTRIBUTED BY HASH(col1) BUCKETS 1;
curl --location-trusted -u user:passwd \ -H "columns:k1, k3, k2, k4,col1 = k1, col2 = k3, col3 = k2, col4 = k4" \ -H "format:json" \ -H "read_json_by_line:true" \ -T data.json \ -X PUT \ http://<fe_ip>:<fe_http_port>/api/example_db/example_table/_stream_load
LOAD LABEL example_db.label_broker ( DATA INFILE("s3://bucket_name/data.json") INTO TABLE example_table FORMAT AS "json" (k1, k3, k2, k4) SET ( col1 = k1, col2 = k3, col3 = k2, col4 = k4 ) ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, k3, k2, k4, col1 = k1, col2 = k3, col3 = k2, col4 = k4), PROPERTIES ( "format" = "json", "read_json_by_line" = "true" ) FROM KAFKA (...);
mysql> SELECT * FROM example_table; +------+-----------+------+------+ | col1 | col2 | col3 | col4 | +------+-----------+------+------+ | 1 | beijing | 100 | 1.1 | | 2 | shanghai | 200 | 1.2 | | 3 | guangzhou | 300 | 1.3 | | 4 | chongqing | NULL | 1.4 | +------+-----------+------+------+
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
column 1, column 2, column 3, column 4 1,100,beijing,1.1 2,200,shanghai,1.2 3,300,guangzhou,1.3 4,\N,chongqing,1.4
The target table has four columns k1, k2, k3, and k4. The mapping should be as follows:
column 1 -> k1 column 2 -> k3 column 3 -> k2 column 4 -> k4
CREATE TABLE example_table ( k1 INT, k2 STRING, k3 INT, k4 DOUBLE ) ENGINE = OLAP DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1;
curl --location-trusted -u user:passwd \ -H "column_separator:," \ -H "columns: k1,k3,k2,k4" \ -T data.csv \ -X PUT \ http://<fe_ip>:<fe_http_port>/api/example_db/example_table/_stream_load
LOAD LABEL example_db.label_broker ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE example_table COLUMNS TERMINATED BY "," (k1, k3, k2, k4) ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, k3, k2, k4), COLUMNS TERMINATED BY "," FROM KAFKA (...);
mysql> select * from example_table; +------+-----------+------+------+ | k1 | k2 | k3 | k4 | +------+-----------+------+------+ | 2 | shanghai | 200 | 1.2 | | 4 | chongqing | NULL | 1.4 | | 3 | guangzhou | 300 | 1.3 | | 1 | beijing | 100 | 1.1 | +------+-----------+------+------+
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
column 1, column 2, column 3, column 4 1,100,beijing,1.1 2,200,shanghai,1.2 3,300,guangzhou,1.3 4,\N,chongqing,1.4
The target table has three columns k1, k2, and k3, while the source file contains four columns. Only the 1st, 2nd, and 4th columns of the source file are needed, with the following mapping:
column 1 -> k1 column 2 -> k2 column 4 -> k3
To skip certain columns in the source file, simply use any column name that does not exist in the target table during column mapping. These column names can be customized without restriction, and the data of these columns will be automatically ignored during import.
CREATE TABLE example_table ( k1 INT, k2 STRING, k3 DOUBLE ) ENGINE = OLAP DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1;
curl --location-trusted -u user:password \ -H "column_separator:," \ -H "columns: k1,k2,tmp_skip,k3" \ -T data.csv \ http://<fe_ip>:<fe_http_port>/api/example_db/example_table/_stream_load
LOAD LABEL example_db.label_broker ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE example_table COLUMNS TERMINATED BY "," (tmp_k1, tmp_k2, tmp_skip, tmp_k3) SET ( k1 = tmp_k1, k2 = tmp_k2, k3 = tmp_k3 ) ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, k2, tmp_skip, k3), PROPERTIES ( "format" = "csv", "column_separator" = "," ) FROM KAFKA (...);
Note:
tmp_skipin the example can be replaced with any name, as long as it is not in the column definition of the target table.
mysql> select * from example_table; +------+------+------+ | k1 | k2 | k3 | +------+------+------+ | 1 | 100 | 1.1 | | 2 | 200 | 1.2 | | 3 | 300 | 1.3 | | 4 | NULL | 1.4 | +------+------+------+
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
column 1, column 2, column 3, column 4 1,100,beijing,1.1 2,200,shanghai,1.2 3,300,guangzhou,1.3 4,\N,chongqing,1.4
The target table has five columns k1, k2, k3, k4, and k5, while the source file contains four columns. Only the 1st, 2nd, 3rd, and 4th columns of the source file are needed, with the following mapping:
column 1 -> k1 column 2 -> k3 column 3 -> k2 column 4 -> k4 k5 uses the default value
The handling rules for missing columns in the target table are as follows:
CREATE TABLE example_table ( k1 INT, k2 STRING, k3 INT, k4 DOUBLE, k5 INT DEFAULT 2 ) ENGINE = OLAP DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1;
curl --location-trusted -u user:passwd \ -H "column_separator:," \ -H "columns: k1,k3,k2,k4" \ -T data.csv \ http://<fe_ip>:<fe_http_port>/api/example_db/example_table/_stream_load
LOAD LABEL example_db.label_broker ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE example_table COLUMNS TERMINATED BY "," (tmp_k1, tmp_k3, tmp_k2, tmp_k4) SET ( k1 = tmp_k1, k3 = tmp_k3, k2 = tmp_k2, k4 = tmp_k4 ) ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, k3, k2, k4), COLUMNS TERMINATED BY "," FROM KAFKA (...);
mysql> select * from example_table; +------+-----------+------+------+------+ | k1 | k2 | k3 | k4 | k5 | +------+-----------+------+------+------+ | 1 | beijing | 100 | 1.1 | 2 | | 2 | shanghai | 200 | 1.2 | 2 | | 3 | guangzhou | 300 | 1.3 | 2 | | 4 | chongqing | NULL | 1.4 | 2 | +------+-----------+------+------+------+
Column transformation allows you to transform column values from the source file. It supports most built-in functions. Column transformation is usually defined together with column mapping: columns are mapped first, and then transformed.
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
column 1, column 2, column 3, column 4 1,100,beijing,1.1 2,200,shanghai,1.2 3,300,guangzhou,1.3 4,\N,chongqing,1.4
The table has four columns k1, k2, k3, and k4. The import mapping and transformation are as follows:
column 1 -> k1 column 2 * 100 -> k3 column 3 -> k2 column 4 -> k4
CREATE TABLE example_table ( k1 INT, k2 STRING, k3 INT, k4 DOUBLE ) ENGINE = OLAP DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1;
curl --location-trusted -u user:passwd \ -H "column_separator:," \ -H "columns: k1, tmp_k3, k2, k4, k3 = tmp_k3 * 100" \ -T data.csv \ http://host:port/api/example_db/example_table/_stream_load
LOAD LABEL example_db.label1 ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE example_table COLUMNS TERMINATED BY "," (k1, tmp_k3, k2, k4) SET ( k3 = tmp_k3 * 100 ) ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, tmp_k3, k2, k4, k3 = tmp_k3 * 100), COLUMNS TERMINATED BY "," FROM KAFKA (...);
mysql> select * from example_table; +------+-----------+-------+------+ | k1 | k2 | k3 | k4 | +------+-----------+-------+------+ | 1 | beijing | 10000 | 1.1 | | 2 | shanghai | 20000 | 1.2 | | 3 | guangzhou | 30000 | 1.3 | | 4 | chongqing | NULL | 1.4 | +------+-----------+-------+------+
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
column 1, column 2, column 3, column 4 1,100,beijing,1.1 2,200,shanghai,1.2 3,300,guangzhou,1.3 4,\N,chongqing,1.4
The table has four columns k1, k2, k3, and k4. The values beijing, shanghai, guangzhou, and chongqing in the source data are converted to their corresponding region IDs before import:
column 1 -> k1 column 2 -> k2 column 3 (after region ID conversion) -> k3 column 4 -> k4
CREATE TABLE example_table ( k1 INT, k2 INT, k3 INT, k4 DOUBLE ) ENGINE = OLAP DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1;
curl --location-trusted -u user:passwd \ -H "column_separator:," \ -H "columns: k1, k2, tmp_k3, k4, k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END" \ -T data.csv \ http://host:port/api/example_db/example_table/_stream_load
LOAD LABEL example_db.label1 ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE example_table COLUMNS TERMINATED BY "," (k1, k2, tmp_k3, k4) SET ( k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END ) ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, k2, tmp_k3, k4, k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END), COLUMNS TERMINATED BY "," FROM KAFKA (...);
mysql> select * from example_table; +------+------+------+------+ | k1 | k2 | k3 | k4 | +------+------+------+------+ | 1 | 100 | 1 | 1.1 | | 2 | 200 | 2 | 1.2 | | 3 | 300 | 3 | 1.3 | | 4 | NULL | 4 | 1.4 | +------+------+------+------+
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
column 1, column 2, column 3, column 4 1,100,beijing,1.1 2,200,shanghai,1.2 3,300,guangzhou,1.3 4,\N,chongqing,1.4
The table has four columns k1, k2, k3, and k4. While converting the region ID, the null value of the k2 column in the source data is converted to 0 during import:
column 1 -> k1 column 2 (convert null to 0 if null) -> k2 column 3 -> k3 column 4 -> k4
CREATE TABLE example_table ( k1 INT, k2 INT, k3 INT, k4 DOUBLE ) ENGINE = OLAP DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1;
curl --location-trusted -u user:passwd \ -H "column_separator:," \ -H "columns: k1, tmp_k2, tmp_k3, k4, k2 = ifnull(tmp_k2, 0), k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END" \ -T data.csv \ http://host:port/api/example_db/example_table/_stream_load
LOAD LABEL example_db.label1 ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE example_table COLUMNS TERMINATED BY "," (k1, tmp_k2, tmp_k3, k4) SET ( k2 = ifnull(tmp_k2, 0), k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END ) ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, tmp_k2, tmp_k3, k4, k2 = ifnull(tmp_k2, 0), k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END), COLUMNS TERMINATED BY "," FROM KAFKA (...);
mysql> select * from example_table; +------+------+------+------+ | k1 | k2 | k3 | k4 | +------+------+------+------+ | 1 | 100 | 1 | 1.1 | | 2 | 200 | 2 | 1.2 | | 3 | 300 | 3 | 1.3 | | 4 | 0 | 4 | 1.4 | +------+------+------+------+
Pre-filtering filters raw data before transformation. It can filter out data that does not need to be processed in advance, reducing the amount of data for subsequent processing and improving import efficiency. This feature is supported only by Broker Load and Routine Load.
| Limitation | Description |
|---|---|
| Filter column limitation | Pre-filtering can only filter independent simple columns in the column list, and cannot filter columns produced by expressions. For example, when the column mapping is (a, tmp, b = tmp + 1), the column b cannot be used as a filter condition. |
| Data processing limitation | Pre-filtering occurs before data transformation and uses raw data values for comparison. The raw data is treated as a string type. For example, data such as \N is compared directly as the string \N, instead of being converted to NULL before comparison. |
This example shows how to use a simple numeric comparison condition to filter source data. By setting the filter condition k1 > 1, unwanted records are filtered out before data transformation.
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
column 1, column 2, column 3, column 4 1,100,beijing,1.1 2,200,shanghai,1.2 3,300,guangzhou,1.3 4,\N,chongqing,1.4
The pre-filter condition is:
column 1 > 1, that is, only data with column 1 > 1 is imported, and the rest is filtered out.
CREATE TABLE example_table ( k1 INT, k2 INT, k3 STRING, k4 DOUBLE ) ENGINE = OLAP DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1;
LOAD LABEL example_db.label1 ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE example_table COLUMNS TERMINATED BY "," (k1, k2, k3, k4) PRECEDING FILTER k1 > 1 ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, k2, k3, k4), COLUMNS TERMINATED BY "," PRECEDING FILTER k1 > 1 FROM KAFKA (...)
mysql> select * from example_table; +------+------+-----------+------+ | k1 | k2 | k3 | k4 | +------+------+-----------+------+ | 2 | 200 | shanghai | 1.2 | | 3 | 300 | guangzhou | 1.3 | | 4 | NULL | chongqing | 1.4 | +------+------+-----------+------+
This example shows how to handle an import scenario that contains invalid data.
The source data is:
1,1 2,abc 3,3
CREATE TABLE example_table ( k1 INT, k2 INT NOT NULL ) ENGINE = OLAP DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1;
For the k2 column whose type is int, abc is invalid dirty data. To filter out this data, you can introduce an intermediate column for filtering.
LOAD LABEL example_db.label1 ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE example_table COLUMNS TERMINATED BY "," (k1, tmp, k2 = tmp) PRECEDING FILTER tmp != "abc" ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, tmp, k2 = tmp), COLUMNS TERMINATED BY "," PRECEDING FILTER tmp != "abc" FROM KAFKA (...);
mysql> select * from example_table; +------+----+ | k1 | k2 | +------+----+ | 1 | 1 | | 3 | 3 | +------+----+
Post-filtering is performed after data transformation and can filter based on the transformed result.
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
column 1, column 2, column 3, column 4 1,100,beijing,1.1 2,200,shanghai,1.2 3,300,guangzhou,1.3 4,\N,chongqing,1.4
The table has four columns k1, k2, k3, and k4. Without column mapping or transformation, only the rows where the 4th column of the source file is greater than 1.2 are imported.
CREATE TABLE example_table ( k1 INT, k2 INT, k3 STRING, k4 DOUBLE ) ENGINE = OLAP DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1;
curl --location-trusted -u user:passwd \ -H "column_separator:," \ -H "columns: k1, k2, k3, k4" \ -H "where: k4 > 1.2" \ -T data.csv \ http://host:port/api/example_db/example_table/_stream_load
LOAD LABEL example_db.label1 ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE example_table COLUMNS TERMINATED BY "," (k1, k2, k3, k4) where k4 > 1.2 ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, k2, k3, k4), COLUMNS TERMINATED BY "," WHERE k4 > 1.2; FROM KAFKA (...)
mysql> select * from example_table; +------+------+-----------+------+ | k1 | k2 | k3 | k4 | +------+------+-----------+------+ | 3 | 300 | guangzhou | 1.3 | | 4 | NULL | chongqing | 1.4 | +------+------+-----------+------+
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
column 1, column 2, column 3, column 4 1,100,beijing,1.1 2,200,shanghai,1.2 3,300,guangzhou,1.3 4,\N,chongqing,1.4
The table has four columns k1, k2, k3, and k4. In the column transformation example, the province name is converted to an ID. Here, the goal is to filter out rows whose ID is 3.
CREATE TABLE example_table ( k1 INT, k2 INT, k3 INT, k4 DOUBLE ) ENGINE = OLAP DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1;
curl --location-trusted -u user:passwd \ -H "column_separator:," \ -H "columns: k1, k2, tmp_k3, k4, k3 = case tmp_k3 when 'beijing' then 1 when 'shanghai' then 2 when 'guangzhou' then 3 when 'chongqing' then 4 else null end" \ -H "where: k3 != 3" \ -T data.csv \ http://host:port/api/example_db/example_table/_stream_load
LOAD LABEL example_db.label1 ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE example_table COLUMNS TERMINATED BY "," (k1, k2, tmp_k3, k4) SET ( k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END ) WHERE k3 != 3 ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, k2, tmp_k3, k4), COLUMNS TERMINATED BY "," SET ( k3 = CASE tmp_k3 WHEN 'beijing' THEN 1 WHEN 'shanghai' THEN 2 WHEN 'guangzhou' THEN 3 WHEN 'chongqing' THEN 4 ELSE NULL END ) WHERE k3 != 3; FROM KAFKA (...)
mysql> select * from example_table; +------+------+------+------+ | k1 | k2 | k3 | k4 | +------+------+------+------+ | 1 | 100 | 1 | 1.1 | | 2 | 200 | 2 | 1.2 | | 4 | NULL | 4 | 1.4 | +------+------+------+------+
Assume the following source data (header column names are shown only for ease of explanation; the actual data has no header):
column 1, column 2, column 3, column 4 1,100,beijing,1.1 2,200,shanghai,1.2 3,300,guangzhou,1.3 4,\N,chongqing,1.4
The table has four columns k1, k2, k3, and k4. Filter out rows where the k1 column is null, and also filter out rows where the k4 column is less than 1.2.
CREATE TABLE example_table ( k1 INT, k2 INT, k3 STRING, k4 DOUBLE ) ENGINE = OLAP DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1;
curl --location-trusted -u user:passwd \ -H "column_separator:," \ -H "columns: k1, k2, k3, k4" \ -H "where: k1 is not null and k4 > 1.2" \ -T data.csv \ http://host:port/api/example_db/example_table/_stream_load
LOAD LABEL example_db.label1 ( DATA INFILE("s3://bucket_name/data.csv") INTO TABLE example_table COLUMNS TERMINATED BY "," (k1, k2, k3, k4) where k1 is not null and k4 > 1.2 ) WITH s3 (...);
CREATE ROUTINE LOAD example_db.example_routine_load ON example_table COLUMNS(k1, k2, k3, k4), COLUMNS TERMINATED BY "," WHERE k1 is not null and k4 > 1.2 FROM KAFKA (...);
mysql> select * from example_table; +------+------+-----------+------+ | k1 | k2 | k3 | k4 | +------+------+-----------+------+ | 3 | 300 | guangzhou | 1.3 | | 4 | NULL | chongqing | 1.4 | +------+------+-----------+------+
PRECEDING FILTER?Stream Load does not support pre-filtering. It can only perform post-filtering through the where parameter. If pre-filtering is required, use Broker Load or Routine Load.
\N as NULL?Pre-filtering occurs before data transformation and uses raw data values for comparison. The raw data is treated as a string type. For \N, the string \N is used directly for comparison, instead of being converted to NULL first. To filter by NULL, use post-filtering (WHERE).
In the column mapping list, assign a column name that does not exist in the target table (such as tmp_skip) to the unwanted column. These temporary column names are placeholders only and are automatically ignored during import.
They are filled according to the following rules:
Column transformation supports most built-in functions, such as ifnull, CASE WHEN, string functions, date functions, and arithmetic operations. They can be used in the columns or SET clause.
Pre-filtering can only filter independent simple columns and cannot filter columns generated by expressions (such as b = tmp + 1). Use post-filtering (WHERE) to reference such columns instead.