import ChangeLog from ‘../changelog/connector-starrocks.md’;
StarRocks 源连接器
通过StarRocks
读取外部数据源数据。 StarRocks
源连接器的内部实现是从FE
获取查询计划, 将查询计划作为参数传递给BE
节点,然后从BE
节点获取数据结果。
名称 | 类型 | 是否必须 | 默认值 |
---|---|---|---|
nodeUrls | list | 是 | - |
username | string | 是 | - |
password | string | 是 | - |
database | string | 是 | - |
table | string | 否 | - |
scan_filter | string | 否 | - |
schema | config | 是 | - |
table_list | array | 否 | - |
request_tablet_size | int | 否 | Integer.MAX_VALUE |
scan_connect_timeout_ms | int | 否 | 30000 |
scan_query_timeout_sec | int | 否 | 3600 |
scan_keep_alive_min | int | 否 | 10 |
scan_batch_rows | int | 否 | 1024 |
scan_mem_limit | long | 否 | 2147483648 |
max_retries | int | 否 | 3 |
scan.params.* | string | 否 | - |
StarRocks
集群地址配置格式 ["fe_ip:fe_http_port", ...]
。
StarRocks
用户名称。
StarRocks
用户密码。
StarRocks
数据库名。
StarRocks
表名。
过滤查询的表达式,该表达式透明地传输到StarRocks
。StarRocks
使用此表达式完成源端数据过滤。
例如
"tinyint_1 = 100"
要生成的starRocks
的schema
示例
schema { fields { name = string age = int } }
StarRocks
表名列表,当需要同时读取多表时使用此配置代替 table
与分区对应的StarRocks tablet
的数量。此值设置得越小,生成的分区就越多。这将增加引擎的平行度,但同时也会给StarRocks
造成更大的压力。
以下示例,用于解释如何使用request_tablet_size
来控制分区的生成。
StarRocks 集群中表的 tablet 分布作为 follower be_node_1 tablet[1, 2, 3, 4, 5] be_node_2 tablet[6, 7, 8, 9, 10] be_node_3 tablet[11, 12, 13, 14, 15] 1.如果没有设置 request_tablet_size,则单个分区中的 tablet 数量将没有限制。分区将按以下方式生成: partition[0] 从 be_node_1 读取 tablet 数据:tablet[1, 2, 3, 4, 5] partition[1] 从 be_node_2 读取 tablet 数据:tablet[6, 7, 8, 9, 10] partition[2] 从 be_node_3 读取 tablet 数据:tablet[11, 12, 13, 14, 15] 2.如果设置了 request_tablet_size=3,则每个分区中最多包含 3 个 tablet。分区将按以下方式生成 partition[0] 从 be_node_1 读取 tablet 数据:tablet[1, 2, 3] partition[1] 从 be_node_1 读取 tablet 数据:tablet[4, 5] partition[2] 从 be_node_2 读取 tablet 数据:tablet[6, 7, 8] partition[3] 从 be_node_2 读取 tablet 数据:tablet[9, 10] partition[4] 从 be_node_3 读取 tablet 数据:tablet[11, 12, 13] partition[5] 从 be_node_3 读取 tablet 数据:tablet[14,15]
发送到 StarRocks
的请求连接超时。
在 StarRocks
中,查询超时时间的默认值为 1 小时,-1 表示没有超时限制。
查询任务的保持连接时长,单位是分钟,默认值为 10 分钟。我们建议将此参数设置为大于或等于 5 的值。
一次从 BE
节点读取的最大数据行数。增加此值可以减少引擎与 StarRocks
之间建立的连接数量,从而减轻由网络延迟引起的开销。
单个查询在 BE 节点上允许的最大内存空间,单位为字节,默认值为 2147483648 字节(即 2 GB)。
发送到 StarRocks
的重试请求次数。
从 BE
节点扫描数据相关的参数。
source { StarRocks { nodeUrls = ["starrocks_e2e:8030"] username = root password = "" database = "test" table = "e2e_table_source" scan_batch_rows = 10 max_retries = 3 schema { fields { BIGINT_COL = BIGINT LARGEINT_COL = STRING SMALLINT_COL = SMALLINT TINYINT_COL = TINYINT BOOLEAN_COL = BOOLEAN DECIMAL_COL = "DECIMAL(20, 1)" DOUBLE_COL = DOUBLE FLOAT_COL = FLOAT INT_COL = INT CHAR_COL = STRING VARCHAR_11_COL = STRING STRING_COL = STRING DATETIME_COL = TIMESTAMP DATE_COL = DATE } } scan.params.scanner_thread_pool_thread_num = "3" } }
source { StarRocks { nodeUrls = ["starrocks_e2e:8030"] username = root password = "" database = "test" table_list = [ { table = "e2e_table_source" schema = { fields { BIGINT_COL = BIGINT LARGEINT_COL = STRING SMALLINT_COL = SMALLINT TINYINT_COL = TINYINT BOOLEAN_COL = BOOLEAN DECIMAL_COL = "DECIMAL(20, 1)" DOUBLE_COL = DOUBLE FLOAT_COL = FLOAT INT_COL = INT CHAR_COL = STRING VARCHAR_11_COL = STRING STRING_COL = STRING DATETIME_COL = TIMESTAMP DATE_COL = DATE } } }, { table = "e2e_table_source_2" schema = { fields { BIGINT_COL_2 = BIGINT LARGEINT_COL_2 = STRING SMALLINT_COL_2 = SMALLINT TINYINT_COL_2 = TINYINT BOOLEAN_COL_2 = BOOLEAN DECIMAL_COL_2 = "DECIMAL(20, 1)" DOUBLE_COL_2 = DOUBLE FLOAT_COL_2 = FLOAT INT_COL_2 = INT CHAR_COL_2 = STRING VARCHAR_11_COL_2 = STRING STRING_COL_2 = STRING DATETIME_COL_2 = TIMESTAMP DATE_COL_2 = DATE } } }] scan_batch_rows = 10 max_retries = 3 scan.params.scanner_thread_pool_thread_num = "3" } }