import ChangeLog from ‘../changelog/connector-easysearch.md’;
Spark
Flink
SeaTunnel Zeta
A sink plugin which use send data to INFINI Easysearch
.
Depenndency easysearch-client
:::tip
Engine Supported
:::
Easysearch Data Type | SeaTunnel Data Type |
---|---|
STRING KEYWORD TEXT | STRING |
BOOLEAN | BOOLEAN |
BYTE | BYTE |
SHORT | SHORT |
INTEGER | INT |
LONG | LONG |
FLOAT HALF_FLOAT | FLOAT |
DOUBLE | DOUBLE |
Date | LOCAL_DATE_TIME_TYPE |
name | type | required | default value |
---|---|---|---|
hosts | array | yes | - |
index | string | yes | - |
primary_keys | list | no | |
key_delimiter | string | no | _ |
username | string | no | |
password | string | no | |
max_retry_count | int | no | 3 |
max_batch_size | int | no | 10 |
tls_verify_certificate | boolean | no | true |
tls_verify_hostnames | boolean | no | true |
tls_keystore_path | string | no | - |
tls_keystore_password | string | no | - |
tls_truststore_path | string | no | - |
tls_truststore_password | string | no | - |
schema_save_mode | enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST |
data_save_mode | enum | no | APPEND_DATA |
common-options | no | - |
INFINI Easysearch
cluster http address, the format is host:port
, allowing multiple hosts to be specified. Such as ["host1:9200", "host2:9200"]
.
INFINI Easysearch
index
name.Index support contains variables of field name,such as seatunnel_${age}
,and the field must appear at seatunnel row. If not, we will treat it as a normal index.
Primary key fields used to generate the document _id
, this is cdc required options.
Delimiter for composite keys (“_” by default), e.g., “$” would result in document _id
“KEY1$KEY2$KEY3”.
security username
security password
one bulk request max try size
batch bulk doc max size
Enable certificates validation for HTTPS endpoints
Enable hostname validation for HTTPS endpoints
The path to the PEM or JKS key store. This file must be readable by the operating system user running SeaTunnel.
The key password for the key store specified
The path to PEM or JKS trust store. This file must be readable by the operating system user running SeaTunnel.
The key password for the trust store specified
Choose how to handle the target-side schema before starting the synchronization task:
RECREATE_SCHEMA
: Creates the table if it doesn't exist, and deletes and recreates it if it does.CREATE_SCHEMA_WHEN_NOT_EXIST
: Creates the table if it doesn't exist, skips creation if it does.ERROR_WHEN_SCHEMA_NOT_EXIST
: Throws an error if the table doesn't exist.IGNORE
: Ignores schema handling.Choose how to handle the target-side data before starting the synchronization task:
DROP_DATA
: Preserves the database structure and deletes the data.APPEND_DATA
: Preserves the database structure and the data.ERROR_WHEN_DATA_EXISTS
: Reports an error when data exists.Sink plugin common parameters, please refer to Sink Common Options for details
Simple
sink { Easysearch { hosts = ["localhost:9200"] index = "seatunnel-${age}" } }
CDC(Change data capture) event
sink { Easysearch { hosts = ["localhost:9200"] index = "seatunnel-${age}" # cdc required options primary_keys = ["key1", "key2", ...] } }
SSL (Disable certificates validation)
sink { Easysearch { hosts = ["https://localhost:9200"] username = "admin" password = "admin" tls_verify_certificate = false } }
SSL (Disable hostname validation)
sink { Easysearch { hosts = ["https://localhost:9200"] username = "admin" password = "admin" tls_verify_hostname = false } }
SSL (Enable certificates validation)
sink { Easysearch { hosts = ["https://localhost:9200"] username = "admin" password = "admin" tls_keystore_path = "${your Easysearch home}/config/certs/http.p12" tls_keystore_password = "${your password}" } }
SAVE_MODE
sink { Easysearch { hosts = ["https://localhost:9200"] username = "admin" password = "admin" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" } }