| import ChangeLog from '../changelog/connector-elasticsearch.md'; |
| |
| # Elasticsearch |
| |
| ## Description |
| |
| Output data to `Elasticsearch`. |
| |
| ## Key features |
| |
| - [ ] [exactly-once](../../concept/connector-v2-features.md) |
| - [x] [cdc](../../concept/connector-v2-features.md) |
| |
| :::tip |
| |
| Engine Supported |
| |
| * supported `ElasticSearch version is >= 2.x and <= 8.x` |
| |
| ::: |
| |
| ## Options |
| |
| | name | type | required | default value | |
| |-------------------------|---------|----------|------------------------------| |
| | hosts | array | yes | - | |
| | index | string | yes | - | |
| | schema_save_mode | string | yes | CREATE_SCHEMA_WHEN_NOT_EXIST | |
| | data_save_mode | string | yes | APPEND_DATA | |
| | index_type | string | no | | |
| | primary_keys | list | no | | |
| | key_delimiter | string | no | `_` | |
| | auth_type | string | no | basic | |
| | username | string | no | | |
| | password | string | no | | |
| | auth.api_key_id | string | no | - | |
| | auth.api_key | string | no | - | |
| | auth.api_key_encoded | string | no | - | |
| |
| | max_retry_count | int | no | 3 | |
| | max_batch_size | int | no | 10 | |
| | tls_verify_certificate | boolean | no | true | |
| | tls_verify_hostnames | boolean | no | true | |
| | tls_keystore_path | string | no | - | |
| | tls_keystore_password | string | no | - | |
| | tls_truststore_path | string | no | - | |
| | tls_truststore_password | string | no | - | |
| | common-options | | no | - | |
| | vectorization_fields | array | no | - | |
| | vector_dimensions | int | no | - | |
| ### hosts [array] |
| |
| `Elasticsearch` cluster http address, the format is `host:port` , allowing multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`. |
| |
| ### index [string] |
| |
| `Elasticsearch` `index` name.Index support contains variables of field name,such as `seatunnel_${age}`(Need to configure schema_save_mode="IGNORE"),and the field must appear at seatunnel row. |
| If not, we will treat it as a normal index. |
| |
| ### index_type [string] |
| |
| `Elasticsearch` index type, it is recommended not to specify in elasticsearch 6 and above |
| |
| ### primary_keys [list] |
| |
| Primary key fields used to generate the document `_id`, this is cdc required options. |
| |
| ### key_delimiter [string] |
| |
| Delimiter for composite keys ("_" by default), e.g., "$" would result in document `_id` "KEY1$KEY2$KEY3". |
| |
| ## Authentication |
| |
| The Elasticsearch connector supports multiple authentication methods to connect to secured Elasticsearch clusters. You can choose the appropriate authentication method based on your Elasticsearch security configuration. |
| |
| ### auth_type [enum] |
| |
| Specifies the authentication method to use. Supported values: |
| - `basic` (default): HTTP Basic Authentication using username and password |
| - `api_key`: Elasticsearch API Key authentication using separate ID and key |
| - `api_key_encoded`: Elasticsearch API Key authentication using encoded key |
| |
| If not specified, defaults to `basic` for backward compatibility. |
| |
| ### Basic Authentication |
| |
| Basic authentication uses HTTP Basic Authentication with username and password credentials. |
| |
| #### username [string] |
| |
| Username for basic authentication (x-pack username). |
| |
| #### password [string] |
| |
| Password for basic authentication (x-pack password). |
| |
| **Example:** |
| ```hocon |
| sink { |
| Elasticsearch { |
| hosts = ["https://localhost:9200"] |
| auth_type = "basic" |
| username = "elastic" |
| password = "your_password" |
| index = "my_index" |
| } |
| } |
| ``` |
| |
| ### API Key Authentication |
| |
| API Key authentication provides a more secure way to authenticate with Elasticsearch using API keys. |
| |
| #### auth.api_key_id [string] |
| |
| The API key ID generated by Elasticsearch. |
| |
| #### auth.api_key [string] |
| |
| The API key secret generated by Elasticsearch. |
| |
| #### auth.api_key_encoded [string] |
| |
| Base64 encoded API key in the format `base64(id:api_key)`. This is an alternative to specifying `auth.api_key_id` and `auth.api_key` separately. |
| |
| **Note:** You can use either `auth.api_key_id` + `auth.api_key` OR `auth.api_key_encoded`, but not both. |
| |
| **Example with separate ID and key:** |
| ```hocon |
| sink { |
| Elasticsearch { |
| hosts = ["https://localhost:9200"] |
| auth_type = "api_key" |
| auth.api_key_id = "your_api_key_id" |
| auth.api_key = "your_api_key_secret" |
| index = "my_index" |
| } |
| } |
| ``` |
| |
| **Example with encoded key:** |
| ```hocon |
| sink { |
| Elasticsearch { |
| hosts = ["https://localhost:9200"] |
| auth_type = "api_key_encoded" |
| auth.api_key_encoded = "eW91cl9hcGlfa2V5X2lkOnlvdXJfYXBpX2tleV9zZWNyZXQ=" |
| index = "my_index" |
| } |
| } |
| ``` |
| |
| |
| |
| ### max_retry_count [int] |
| |
| one bulk request max try size |
| |
| ### vectorization_fields [array] |
| fields to embeddings |
| |
| ### vector_dimensions [int] |
| embeddings dimensions |
| |
| ### max_batch_size [int] |
| |
| batch bulk doc max size |
| |
| ### tls_verify_certificate [boolean] |
| |
| Enable certificates validation for HTTPS endpoints |
| |
| ### tls_verify_hostname [boolean] |
| |
| Enable hostname validation for HTTPS endpoints |
| |
| ### tls_keystore_path [string] |
| |
| The path to the PEM or JKS key store. This file must be readable by the operating system user running SeaTunnel. |
| |
| ### tls_keystore_password [string] |
| |
| The key password for the key store specified |
| |
| ### tls_truststore_path [string] |
| |
| The path to PEM or JKS trust store. This file must be readable by the operating system user running SeaTunnel. |
| |
| ### tls_truststore_password [string] |
| |
| The key password for the trust store specified |
| |
| ### common options |
| |
| Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details |
| |
| ### schema_save_mode |
| |
| Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. |
| Option introduction: |
| `RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved |
| `CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved |
| `ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist |
| `IGNORE` :Ignore the treatment of the table |
| |
| ### data_save_mode |
| |
| Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. |
| Option introduction: |
| `DROP_DATA`: Preserve database structure and delete data |
| `APPEND_DATA`:Preserve database structure, preserve data |
| `ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported |
| |
| ## Examples |
| |
| Simple |
| |
| ```conf |
| sink { |
| Elasticsearch { |
| hosts = ["localhost:9200"] |
| index = "seatunnel-${age}" |
| schema_save_mode="IGNORE" |
| } |
| } |
| |
| ``` |
| Multi-table writing |
| |
| ```conf |
| sink { |
| Elasticsearch { |
| hosts = ["localhost:9200"] |
| index = "${table_name}" |
| schema_save_mode="IGNORE" |
| } |
| } |
| ``` |
| |
| vector-field writing |
| |
| ```conf |
| sink { |
| Elasticsearch { |
| hosts = ["localhost:9200"] |
| index = "${table_name}" |
| schema_save_mode="IGNORE" |
| vectorization_fields = ["review_embedding"] |
| vector_dimensions = 1024 |
| } |
| } |
| ``` |
| |
| CDC(Change data capture) event |
| |
| ```conf |
| sink { |
| Elasticsearch { |
| hosts = ["localhost:9200"] |
| index = "seatunnel-${age}" |
| schema_save_mode="IGNORE" |
| # cdc required options |
| primary_keys = ["key1", "key2", ...] |
| } |
| } |
| |
| ``` |
| CDC(Change data capture) event Multi-table writing |
| |
| ```conf |
| sink { |
| Elasticsearch { |
| hosts = ["localhost:9200"] |
| index = "${table_name}" |
| schema_save_mode="IGNORE" |
| primary_keys = ["${primary_key}"] |
| } |
| } |
| ``` |
| |
| SSL (Disable certificates validation) |
| |
| ```hocon |
| sink { |
| Elasticsearch { |
| hosts = ["https://localhost:9200"] |
| username = "elastic" |
| password = "elasticsearch" |
| |
| tls_verify_certificate = false |
| } |
| } |
| ``` |
| |
| SSL (Disable hostname validation) |
| |
| ```hocon |
| sink { |
| Elasticsearch { |
| hosts = ["https://localhost:9200"] |
| username = "elastic" |
| password = "elasticsearch" |
| |
| tls_verify_hostname = false |
| } |
| } |
| ``` |
| |
| SSL (Enable certificates validation) |
| |
| ```hocon |
| sink { |
| Elasticsearch { |
| hosts = ["https://localhost:9200"] |
| username = "elastic" |
| password = "elasticsearch" |
| |
| tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12" |
| tls_keystore_password = "${your password}" |
| } |
| } |
| ``` |
| |
| SAVE_MODE |
| |
| ```hocon |
| sink { |
| Elasticsearch { |
| hosts = ["https://localhost:9200"] |
| username = "elastic" |
| password = "elasticsearch" |
| |
| schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" |
| data_save_mode = "APPEND_DATA" |
| } |
| } |
| ``` |
| |
| ### Schema Evolution |
| |
| CDC collection supports a limited number of schema changes. The currently supported schema changes include: |
| |
| * Adding columns. |
| |
| ### Schema Evolution |
| ```hocon |
| env { |
| # You can set engine configuration here |
| parallelism = 5 |
| job.mode = "STREAMING" |
| checkpoint.interval = 5000 |
| read_limit.bytes_per_second = 7000000 |
| read_limit.rows_per_second = 400 |
| } |
| |
| source { |
| MySQL-CDC { |
| server-id = 5652-5657 |
| username = "st_user_source" |
| password = "mysqlpw" |
| table-names = ["shop.products"] |
| url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" |
| schema-changes.enabled = true |
| } |
| } |
| |
| sink { |
| Elasticsearch { |
| hosts = ["https://elasticsearch:9200"] |
| username = "elastic" |
| password = "elasticsearch" |
| tls_verify_certificate = false |
| tls_verify_hostname = false |
| index = "schema_change_index" |
| index_type = "_doc" |
| "schema_save_mode" = "CREATE_SCHEMA_WHEN_NOT_EXIST" |
| "data_save_mode" = "APPEND_DATA" |
| } |
| } |
| ``` |
| |
| ## Changelog |
| |
| <ChangeLog /> |