import ChangeLog from ‘../changelog/connector-easysearch.md’;

INFINI Easysearch

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Description

A sink plugin which use send data to INFINI Easysearch.

Using Dependency

Depenndency easysearch-client

Key features

:::tip

Engine Supported

:::

Data Type Mapping

Easysearch Data TypeSeaTunnel Data Type
STRING
KEYWORD
TEXT
STRING
BOOLEANBOOLEAN
BYTEBYTE
SHORTSHORT
INTEGERINT
LONGLONG
FLOAT
HALF_FLOAT
FLOAT
DOUBLEDOUBLE
DateLOCAL_DATE_TIME_TYPE

Sink Options

nametyperequireddefault value
hostsarrayyes-
indexstringyes-
primary_keyslistno
key_delimiterstringno_
usernamestringno
passwordstringno
max_retry_countintno3
max_batch_sizeintno10
tls_verify_certificatebooleannotrue
tls_verify_hostnamesbooleannotrue
tls_keystore_pathstringno-
tls_keystore_passwordstringno-
tls_truststore_pathstringno-
tls_truststore_passwordstringno-
schema_save_modeenumnoCREATE_SCHEMA_WHEN_NOT_EXIST
data_save_modeenumnoAPPEND_DATA
common-optionsno-

hosts [array]

INFINI Easysearch cluster http address, the format is host:port , allowing multiple hosts to be specified. Such as ["host1:9200", "host2:9200"].

index [string]

INFINI Easysearch index name.Index support contains variables of field name,such as seatunnel_${age},and the field must appear at seatunnel row. If not, we will treat it as a normal index.

primary_keys [list]

Primary key fields used to generate the document _id, this is cdc required options.

key_delimiter [string]

Delimiter for composite keys (“_” by default), e.g., “$” would result in document _id “KEY1$KEY2$KEY3”.

username [string]

security username

password [string]

security password

max_retry_count [int]

one bulk request max try size

max_batch_size [int]

batch bulk doc max size

tls_verify_certificate [boolean]

Enable certificates validation for HTTPS endpoints

tls_verify_hostname [boolean]

Enable hostname validation for HTTPS endpoints

tls_keystore_path [string]

The path to the PEM or JKS key store. This file must be readable by the operating system user running SeaTunnel.

tls_keystore_password [string]

The key password for the key store specified

tls_truststore_path [string]

The path to PEM or JKS trust store. This file must be readable by the operating system user running SeaTunnel.

tls_truststore_password [string]

The key password for the trust store specified

schema_save_mode [enum]

Choose how to handle the target-side schema before starting the synchronization task:

  • RECREATE_SCHEMA: Creates the table if it doesn't exist, and deletes and recreates it if it does.
  • CREATE_SCHEMA_WHEN_NOT_EXIST: Creates the table if it doesn't exist, skips creation if it does.
  • ERROR_WHEN_SCHEMA_NOT_EXIST: Throws an error if the table doesn't exist.
  • IGNORE: Ignores schema handling.

data_save_mode [enum]

Choose how to handle the target-side data before starting the synchronization task:

  • DROP_DATA: Preserves the database structure and deletes the data.
  • APPEND_DATA: Preserves the database structure and the data.
  • ERROR_WHEN_DATA_EXISTS: Reports an error when data exists.

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Examples

Simple

sink {
    Easysearch {
        hosts = ["localhost:9200"]
        index = "seatunnel-${age}"
    }
}

CDC(Change data capture) event

sink {
    Easysearch {
        hosts = ["localhost:9200"]
        index = "seatunnel-${age}"

        # cdc required options
        primary_keys = ["key1", "key2", ...]
    }
}

SSL (Disable certificates validation)

sink {
    Easysearch {
        hosts = ["https://localhost:9200"]
        username = "admin"
        password = "admin"

        tls_verify_certificate = false
    }
}

SSL (Disable hostname validation)

sink {
    Easysearch {
        hosts = ["https://localhost:9200"]
        username = "admin"
        password = "admin"

        tls_verify_hostname = false
    }
}

SSL (Enable certificates validation)

sink {
    Easysearch {
        hosts = ["https://localhost:9200"]
        username = "admin"
        password = "admin"

        tls_keystore_path = "${your Easysearch home}/config/certs/http.p12"
        tls_keystore_password = "${your password}"
    }
}

SAVE_MODE

sink {
    Easysearch {
        hosts = ["https://localhost:9200"]
        username = "admin"
        password = "admin"

        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        data_save_mode = "APPEND_DATA"
    }
}

Changelog