import ChangeLog from ‘../changelog/connector-easysearch.md’;

INFINI Easysearch

支持以下引擎

Spark
Flink
SeaTunnel Zeta

描述

一个使用将数据发送到 INFINI Easysearch 的接收器插件.

使用依赖

依赖 easysearch-client

关键特性

:::提示

支持的引擎

:::

数据类型映射

Easysearch 数据类型SeaTunnel 数据类型
STRING
KEYWORD
TEXT
STRING
BOOLEANBOOLEAN
BYTEBYTE
SHORTSHORT
INTEGERINT
LONGLONG
FLOAT
HALF_FLOAT
FLOAT
DOUBLEDOUBLE
DateLOCAL_DATE_TIME_TYPE

接收器选项

名称类型必需默认值
hostsarray-
indexstring-
primary_keyslist
key_delimiterstring_
usernamestring
passwordstring
max_retry_countint3
max_batch_sizeint10
tls_verify_certificatebooleantrue
tls_verify_hostnamesbooleantrue
tls_keystore_pathstring-
tls_keystore_passwordstring-
tls_truststore_pathstring-
tls_truststore_passwordstring-
schema_save_modeenumCREATE_SCHEMA_WHEN_NOT_EXIST
data_save_modeenumAPPEND_DATA
common-options-

hosts [array]

INFINI Easysearch 集群http地址,格式为 host:port , 允许指定多个主机.例如 ["host1:9200", "host2:9200"].

index [string]

INFINI Easysearch index 名称.索引支持包含字段名变量,例如 seatunnel_${age},该字段必须出现在seatunnel行. 如果没有,我们将把它当作一个正常的索引.

primary_keys [list]

用于生成文档 _id的主键字段,这是cdc必需的选项.

key_delimiter [string]

复合键的分隔符 (默认为"_" ), 例如, “$” 将导致文档 _id “KEY1$KEY2$KEY3”.

username [string]

安全用户名

password [string]

安全密码

max_retry_count [int]

一个批量请求的最大尝试大小

max_batch_size [int]

批量文档最大大小

tls_verify_certificate [boolean]

为HTTPS端点启用证书验证

tls_verify_hostname [boolean]

为HTTPS端点启用主机名验证

tls_keystore_path [string]

PEM或JKS密钥存储的路径。运行SeaTunnel的操作系统用户必须能够读取此文件.

tls_keystore_password [string]

指定密钥存储的密钥密码

tls_truststore_path [string]

PEM或JKS信任存储的路径。运行SeaTunnel的操作系统用户必须能够读取此文件.

tls_truststore_password [string]

指定的信任存储的密钥密码

schema_save_mode [enum]

在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案:

  • RECREATE_SCHEMA:当表不存在时会创建,当表已存在时会删除并重建
  • CREATE_SCHEMA_WHEN_NOT_EXIST:当表不存在时会创建,当表已存在时则跳过创建
  • ERROR_WHEN_SCHEMA_NOT_EXIST:当表不存在时将抛出错误
  • IGNORE:忽略对表的处理

data_save_mode [enum]

在启动同步任务之前,针对目标端已有的数据选择不同的处理方案:

  • DROP_DATA:保留数据库结构并删除数据
  • APPEND_DATA:保留数据库结构,保留数据
  • ERROR_WHEN_DATA_EXISTS:有数据时报错

common options

接收器插件常用参数,详见 Sink Common Options

示例

简单的例子

sink {
    Easysearch {
        hosts = ["localhost:9200"]
        index = "seatunnel-${age}"
    }
}

CDC(变更数据捕获) 事件

sink {
    Easysearch {
        hosts = ["localhost:9200"]
        index = "seatunnel-${age}"

        # cdc required options
        primary_keys = ["key1", "key2", ...]
    }
}

SSL (禁用证书验证)

sink {
    Easysearch {
        hosts = ["https://localhost:9200"]
        username = "admin"
        password = "admin"

        tls_verify_certificate = false
    }
}

SSL (禁用主机名验证)

sink {
    Easysearch {
        hosts = ["https://localhost:9200"]
        username = "admin"
        password = "admin"

        tls_verify_hostname = false
    }
}

SSL (启用证书验证)

sink {
    Easysearch {
        hosts = ["https://localhost:9200"]
        username = "admin"
        password = "admin"

        tls_keystore_path = "${your Easysearch home}/config/certs/http.p12"
        tls_keystore_password = "${your password}"
    }
}

配置表生成策略

sink {
    Easysearch {
        hosts = ["https://localhost:9200"]
        username = "admin"
        password = "admin"

        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        data_save_mode = "APPEND_DATA"
    }
}

变更日志