import ChangeLog from ‘../changelog/connector-easysearch.md’;
Spark
Flink
SeaTunnel Zeta
一个使用将数据发送到 INFINI Easysearch
的接收器插件.
:::提示
支持的引擎
:::
Easysearch 数据类型 | SeaTunnel 数据类型 |
---|---|
STRING KEYWORD TEXT | STRING |
BOOLEAN | BOOLEAN |
BYTE | BYTE |
SHORT | SHORT |
INTEGER | INT |
LONG | LONG |
FLOAT HALF_FLOAT | FLOAT |
DOUBLE | DOUBLE |
Date | LOCAL_DATE_TIME_TYPE |
名称 | 类型 | 必需 | 默认值 |
---|---|---|---|
hosts | array | 是 | - |
index | string | 是 | - |
primary_keys | list | 否 | |
key_delimiter | string | 否 | _ |
username | string | 否 | |
password | string | 否 | |
max_retry_count | int | 否 | 3 |
max_batch_size | int | 否 | 10 |
tls_verify_certificate | boolean | 否 | true |
tls_verify_hostnames | boolean | 否 | true |
tls_keystore_path | string | 否 | - |
tls_keystore_password | string | 否 | - |
tls_truststore_path | string | 否 | - |
tls_truststore_password | string | 否 | - |
schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
data_save_mode | enum | 否 | APPEND_DATA |
common-options | 否 | - |
INFINI Easysearch
集群http地址,格式为 host:port
, 允许指定多个主机.例如 ["host1:9200", "host2:9200"]
.
INFINI Easysearch
index
名称.索引支持包含字段名变量,例如 seatunnel_${age}
,该字段必须出现在seatunnel行. 如果没有,我们将把它当作一个正常的索引.
用于生成文档 _id
的主键字段,这是cdc必需的选项.
复合键的分隔符 (默认为"_" ), 例如, “$” 将导致文档 _id
“KEY1$KEY2$KEY3”.
安全用户名
安全密码
一个批量请求的最大尝试大小
批量文档最大大小
为HTTPS端点启用证书验证
为HTTPS端点启用主机名验证
PEM或JKS密钥存储的路径。运行SeaTunnel的操作系统用户必须能够读取此文件.
指定密钥存储的密钥密码
PEM或JKS信任存储的路径。运行SeaTunnel的操作系统用户必须能够读取此文件.
指定的信任存储的密钥密码
在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案:
RECREATE_SCHEMA
:当表不存在时会创建,当表已存在时会删除并重建CREATE_SCHEMA_WHEN_NOT_EXIST
:当表不存在时会创建,当表已存在时则跳过创建ERROR_WHEN_SCHEMA_NOT_EXIST
:当表不存在时将抛出错误IGNORE
:忽略对表的处理在启动同步任务之前,针对目标端已有的数据选择不同的处理方案:
DROP_DATA
:保留数据库结构并删除数据APPEND_DATA
:保留数据库结构,保留数据ERROR_WHEN_DATA_EXISTS
:有数据时报错接收器插件常用参数,详见 Sink Common Options
简单的例子
sink { Easysearch { hosts = ["localhost:9200"] index = "seatunnel-${age}" } }
CDC(变更数据捕获) 事件
sink { Easysearch { hosts = ["localhost:9200"] index = "seatunnel-${age}" # cdc required options primary_keys = ["key1", "key2", ...] } }
SSL (禁用证书验证)
sink { Easysearch { hosts = ["https://localhost:9200"] username = "admin" password = "admin" tls_verify_certificate = false } }
SSL (禁用主机名验证)
sink { Easysearch { hosts = ["https://localhost:9200"] username = "admin" password = "admin" tls_verify_hostname = false } }
SSL (启用证书验证)
sink { Easysearch { hosts = ["https://localhost:9200"] username = "admin" password = "admin" tls_keystore_path = "${your Easysearch home}/config/certs/http.p12" tls_keystore_password = "${your password}" } }
配置表生成策略
sink { Easysearch { hosts = ["https://localhost:9200"] username = "admin" password = "admin" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" } }