SQL 转换插件
使用 SQL 来转换给定的输入行。
SQL 转换使用内存中的 SQL 引擎,我们可以通过 SQL 函数和 SQL 引擎的能力来实现转换任务。
名称 | 类型 | 是否必须 | 默认值 |
---|---|---|---|
plugin_input | string | yes | - |
plugin_output | string | yes | - |
query | string | yes | - |
源表名称,查询 SQL 表名称必须与此字段匹配。
查询 SQL,它是一个简单的 SQL,支持基本的函数和条件过滤操作。但是,复杂的 SQL 尚不支持,包括:多源表/行连接和聚合操作等。
查询表达式可以是select [table_name.]column_a
,这时会去查询列为column_a
的列,table_name
为可选项 也可以是select c_row.c_inner_row.column_b
,这时会去查询列c_row
下的c_inner_row
的column_b
。嵌套结构查询中,不能存在table_name
源端数据读取的表格如下:
id | name | age |
---|---|---|
1 | Joy Ding | 20 |
2 | May Ding | 21 |
3 | Kin Dom | 24 |
4 | Joy Dom | 22 |
我们使用 SQL 查询来转换源数据,类似这样:
transform { Sql { plugin_input = "fake" plugin_output = "fake1" query = "select id, concat(name, '_') as name, age+1 as age from dual where id>0" } }
那么结果表 fake1
中的数据将会更新为:
id | name | age |
---|---|---|
1 | Joy Ding_ | 21 |
2 | May Ding_ | 22 |
3 | Kin Dom_ | 25 |
4 | Joy Dom_ | 23 |
例如你的上游数据结构是这样:
source { FakeSource { plugin_output = "fake" row.num = 100 string.template = ["innerQuery"] schema = { fields { name = "string" c_date = "date" c_row = { c_inner_row = { c_inner_int = "int" c_inner_string = "string" c_inner_timestamp = "timestamp" c_map_1 = "map<string, string>" c_map_2 = "map<string, map<string,string>>" } c_string = "string" } } } } }
那么下列所有的查询表达式都是有效的
select name, c_date, c_row, c_row.c_inner_row, c_row.c_string, c_row.c_inner_row.c_inner_int, c_row.c_inner_row.c_inner_string, c_row.c_inner_row.c_inner_timestamp, c_row.c_inner_row.c_map_1, c_row.c_inner_row.c_map_1.some_key
但是这个查询语句是无效的
select c_row.c_inner_row.c_map_2.some_key.inner_map_key
当查询map结构时,map结构应该为最后一个数据结构,不能查询嵌套map
env { job.mode = "BATCH" } source { FakeSource { plugin_output = "fake" row.num = 100 schema = { fields { id = "int" name = "string" age = "int" } } } } transform { Sql { plugin_input = "fake" plugin_output = "fake1" query = "select id, concat(name, '_') as name, age+1 as age from dual where id>0" } } sink { Console { plugin_input = "fake1" } }