import ChangeLog from ‘../changelog/connector-starrocks.md’;

StarRocks

StarRocks 源连接器

描述

通过StarRocks读取外部数据源数据。 StarRocks源连接器的内部实现是从FE获取查询计划, 将查询计划作为参数传递给BE节点,然后从BE节点获取数据结果。

主要功能

配置选项

名称类型是否必须默认值
nodeUrlslist-
usernamestring-
passwordstring-
databasestring-
tablestring-
scan_filterstring-
schemaconfig-
table_listarray-
request_tablet_sizeintInteger.MAX_VALUE
scan_connect_timeout_msint30000
scan_query_timeout_secint3600
scan_keep_alive_minint10
scan_batch_rowsint1024
scan_mem_limitlong2147483648
max_retriesint3
scan.params.*string-

nodeUrls [list]

StarRocks 集群地址配置格式 ["fe_ip:fe_http_port", ...]

username [string]

StarRocks 用户名称。

password [string]

StarRocks 用户密码。

database [string]

StarRocks 数据库名。

table [string]

StarRocks 表名。

scan_filter [string]

过滤查询的表达式,该表达式透明地传输到StarRocksStarRocks 使用此表达式完成源端数据过滤。

例如

"tinyint_1 = 100"

schema [config]

fields [Config]

要生成的starRocksschema

示例

schema {
    fields {
        name = string
        age = int
    }
  }

table_list [array]

StarRocks 表名列表,当需要同时读取多表时使用此配置代替 table

request_tablet_size [int]

与分区对应的StarRocks tablet的数量。此值设置得越小,生成的分区就越多。这将增加引擎的平行度,但同时也会给StarRocks造成更大的压力。

以下示例,用于解释如何使用request_tablet_size来控制分区的生成。

StarRocks 集群中表的 tablet 分布作为 follower

be_node_1 tablet[1, 2, 3, 4, 5]
be_node_2 tablet[6, 7, 8, 9, 10]
be_node_3 tablet[11, 12, 13, 14, 15]

1.如果没有设置 request_tablet_size,则单个分区中的 tablet 数量将没有限制。分区将按以下方式生成:

partition[0] 从 be_node_1 读取 tablet 数据:tablet[1, 2, 3, 4, 5]
partition[1] 从 be_node_2 读取 tablet 数据:tablet[6, 7, 8, 9, 10]
partition[2] 从 be_node_3 读取 tablet 数据:tablet[11, 12, 13, 14, 15]

2.如果设置了 request_tablet_size=3,则每个分区中最多包含 3 个 tablet。分区将按以下方式生成

partition[0] 从 be_node_1 读取 tablet 数据:tablet[1, 2, 3]
partition[1] 从 be_node_1 读取 tablet 数据:tablet[4, 5]
partition[2] 从 be_node_2 读取 tablet 数据:tablet[6, 7, 8]
partition[3] 从 be_node_2 读取 tablet 数据:tablet[9, 10]
partition[4] 从 be_node_3 读取 tablet 数据:tablet[11, 12, 13]
partition[5] 从 be_node_3 读取 tablet 数据:tablet[14,15]

scan_connect_timeout_ms [int]

发送到 StarRocks 的请求连接超时。

scan_query_timeout_sec [int]

StarRocks 中,查询超时时间的默认值为 1 小时,-1 表示没有超时限制。

scan_keep_alive_min [int]

查询任务的保持连接时长,单位是分钟,默认值为 10 分钟。我们建议将此参数设置为大于或等于 5 的值。

scan_batch_rows [int]

一次从 BE 节点读取的最大数据行数。增加此值可以减少引擎与 StarRocks 之间建立的连接数量,从而减轻由网络延迟引起的开销。

scan_mem_limit [long]

单个查询在 BE 节点上允许的最大内存空间,单位为字节,默认值为 2147483648 字节(即 2 GB)。

max_retries [int]

发送到 StarRocks 的重试请求次数。

scan.params. [string]

BE 节点扫描数据相关的参数。

示例 1

source {
  StarRocks {
    nodeUrls = ["starrocks_e2e:8030"]
    username = root
    password = ""
    database = "test"
    table = "e2e_table_source"
    scan_batch_rows = 10
    max_retries = 3
    schema {
        fields {
           BIGINT_COL = BIGINT
           LARGEINT_COL = STRING
           SMALLINT_COL = SMALLINT
           TINYINT_COL = TINYINT
           BOOLEAN_COL = BOOLEAN
           DECIMAL_COL = "DECIMAL(20, 1)"
           DOUBLE_COL = DOUBLE
           FLOAT_COL = FLOAT
           INT_COL = INT
           CHAR_COL = STRING
           VARCHAR_11_COL = STRING
           STRING_COL = STRING
           DATETIME_COL = TIMESTAMP
           DATE_COL = DATE
        }
    }
    scan.params.scanner_thread_pool_thread_num = "3"
    
  }
}

示例 2: 读取多表

source {
  StarRocks {
    nodeUrls = ["starrocks_e2e:8030"]
    username = root
    password = ""
    database = "test"
    table_list = [
    {
        table = "e2e_table_source"
        schema = {
            fields {
               BIGINT_COL = BIGINT
               LARGEINT_COL = STRING
               SMALLINT_COL = SMALLINT
               TINYINT_COL = TINYINT
               BOOLEAN_COL = BOOLEAN
               DECIMAL_COL = "DECIMAL(20, 1)"
               DOUBLE_COL = DOUBLE
               FLOAT_COL = FLOAT
               INT_COL = INT
               CHAR_COL = STRING
               VARCHAR_11_COL = STRING
               STRING_COL = STRING
               DATETIME_COL = TIMESTAMP
               DATE_COL = DATE
            }
        }
    },
    {
        table = "e2e_table_source_2"
        schema = {
            fields {
               BIGINT_COL_2 = BIGINT
               LARGEINT_COL_2 = STRING
               SMALLINT_COL_2 = SMALLINT
               TINYINT_COL_2 = TINYINT
               BOOLEAN_COL_2 = BOOLEAN
               DECIMAL_COL_2 = "DECIMAL(20, 1)"
               DOUBLE_COL_2 = DOUBLE
               FLOAT_COL_2 = FLOAT
               INT_COL_2 = INT
               CHAR_COL_2 = STRING
               VARCHAR_11_COL_2 = STRING
               STRING_COL_2 = STRING
               DATETIME_COL_2 = TIMESTAMP
               DATE_COL_2 = DATE
            }
        }
    }]
    scan_batch_rows = 10
    max_retries = 3
    scan.params.scanner_thread_pool_thread_num = "3"
    
  }
}

变更日志