import ChangeLog from ‘../changelog/connector-elasticsearch.md’;
Output data to Elasticsearch
.
:::tip
Engine Supported
ElasticSearch version is >= 2.x and <= 8.x
:::
name | type | required | default value |
---|---|---|---|
hosts | array | yes | - |
index | string | yes | - |
schema_save_mode | string | yes | CREATE_SCHEMA_WHEN_NOT_EXIST |
data_save_mode | string | yes | APPEND_DATA |
index_type | string | no | |
primary_keys | list | no | |
key_delimiter | string | no | _ |
auth_type | string | no | basic |
username | string | no | |
password | string | no | |
auth.api_key_id | string | no | - |
auth.api_key | string | no | - |
auth.api_key_encoded | string | no | - |
| max_retry_count | int | no | 3 | | max_batch_size | int | no | 10 | | tls_verify_certificate | boolean | no | true | | tls_verify_hostnames | boolean | no | true | | tls_keystore_path | string | no | - | | tls_keystore_password | string | no | - | | tls_truststore_path | string | no | - | | tls_truststore_password | string | no | - | | common-options | | no | - | | vectorization_fields | array | no | - | | vector_dimensions | int | no | - |
Elasticsearch
cluster http address, the format is host:port
, allowing multiple hosts to be specified. Such as ["host1:9200", "host2:9200"]
.
Elasticsearch
index
name.Index support contains variables of field name,such as seatunnel_${age}
(Need to configure schema_save_mode=“IGNORE”),and the field must appear at seatunnel row. If not, we will treat it as a normal index.
Elasticsearch
index type, it is recommended not to specify in elasticsearch 6 and above
Primary key fields used to generate the document _id
, this is cdc required options.
Delimiter for composite keys (“_” by default), e.g., “$” would result in document _id
“KEY1$KEY2$KEY3”.
The Elasticsearch connector supports multiple authentication methods to connect to secured Elasticsearch clusters. You can choose the appropriate authentication method based on your Elasticsearch security configuration.
Specifies the authentication method to use. Supported values:
basic
(default): HTTP Basic Authentication using username and passwordapi_key
: Elasticsearch API Key authentication using separate ID and keyapi_key_encoded
: Elasticsearch API Key authentication using encoded keyIf not specified, defaults to basic
for backward compatibility.
Basic authentication uses HTTP Basic Authentication with username and password credentials.
Username for basic authentication (x-pack username).
Password for basic authentication (x-pack password).
Example:
sink { Elasticsearch { hosts = ["https://localhost:9200"] auth_type = "basic" username = "elastic" password = "your_password" index = "my_index" } }
API Key authentication provides a more secure way to authenticate with Elasticsearch using API keys.
The API key ID generated by Elasticsearch.
The API key secret generated by Elasticsearch.
Base64 encoded API key in the format base64(id:api_key)
. This is an alternative to specifying auth.api_key_id
and auth.api_key
separately.
Note: You can use either auth.api_key_id
+ auth.api_key
OR auth.api_key_encoded
, but not both.
Example with separate ID and key:
sink { Elasticsearch { hosts = ["https://localhost:9200"] auth_type = "api_key" auth.api_key_id = "your_api_key_id" auth.api_key = "your_api_key_secret" index = "my_index" } }
Example with encoded key:
sink { Elasticsearch { hosts = ["https://localhost:9200"] auth_type = "api_key_encoded" auth.api_key_encoded = "eW91cl9hcGlfa2V5X2lkOnlvdXJfYXBpX2tleV9zZWNyZXQ=" index = "my_index" } }
one bulk request max try size
fields to embeddings
embeddings dimensions
batch bulk doc max size
Enable certificates validation for HTTPS endpoints
Enable hostname validation for HTTPS endpoints
The path to the PEM or JKS key store. This file must be readable by the operating system user running SeaTunnel.
The key password for the key store specified
The path to PEM or JKS trust store. This file must be readable by the operating system user running SeaTunnel.
The key password for the trust store specified
Sink plugin common parameters, please refer to Sink Common Options for details
Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. Option introduction:RECREATE_SCHEMA
:Will create when the table does not exist, delete and rebuild when the table is savedCREATE_SCHEMA_WHEN_NOT_EXIST
:Will Created when the table does not exist, skipped when the table is savedERROR_WHEN_SCHEMA_NOT_EXIST
:Error will be reported when the table does not existIGNORE
:Ignore the treatment of the table
Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. Option introduction:DROP_DATA
: Preserve database structure and delete dataAPPEND_DATA
:Preserve database structure, preserve dataERROR_WHEN_DATA_EXISTS
:When there is data, an error is reported
Simple
sink { Elasticsearch { hosts = ["localhost:9200"] index = "seatunnel-${age}" schema_save_mode="IGNORE" } }
Multi-table writing
sink { Elasticsearch { hosts = ["localhost:9200"] index = "${table_name}" schema_save_mode="IGNORE" } }
vector-field writing
sink { Elasticsearch { hosts = ["localhost:9200"] index = "${table_name}" schema_save_mode="IGNORE" vectorization_fields = ["review_embedding"] vector_dimensions = 1024 } }
CDC(Change data capture) event
sink { Elasticsearch { hosts = ["localhost:9200"] index = "seatunnel-${age}" schema_save_mode="IGNORE" # cdc required options primary_keys = ["key1", "key2", ...] } }
CDC(Change data capture) event Multi-table writing
sink { Elasticsearch { hosts = ["localhost:9200"] index = "${table_name}" schema_save_mode="IGNORE" primary_keys = ["${primary_key}"] } }
SSL (Disable certificates validation)
sink { Elasticsearch { hosts = ["https://localhost:9200"] username = "elastic" password = "elasticsearch" tls_verify_certificate = false } }
SSL (Disable hostname validation)
sink { Elasticsearch { hosts = ["https://localhost:9200"] username = "elastic" password = "elasticsearch" tls_verify_hostname = false } }
SSL (Enable certificates validation)
sink { Elasticsearch { hosts = ["https://localhost:9200"] username = "elastic" password = "elasticsearch" tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12" tls_keystore_password = "${your password}" } }
SAVE_MODE
sink { Elasticsearch { hosts = ["https://localhost:9200"] username = "elastic" password = "elasticsearch" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" } }
CDC collection supports a limited number of schema changes. The currently supported schema changes include:
env { # You can set engine configuration here parallelism = 5 job.mode = "STREAMING" checkpoint.interval = 5000 read_limit.bytes_per_second = 7000000 read_limit.rows_per_second = 400 } source { MySQL-CDC { server-id = 5652-5657 username = "st_user_source" password = "mysqlpw" table-names = ["shop.products"] url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" schema-changes.enabled = true } } sink { Elasticsearch { hosts = ["https://elasticsearch:9200"] username = "elastic" password = "elasticsearch" tls_verify_certificate = false tls_verify_hostname = false index = "schema_change_index" index_type = "_doc" "schema_save_mode" = "CREATE_SCHEMA_WHEN_NOT_EXIST" "data_save_mode" = "APPEND_DATA" } }