import ChangeLog from ‘../changelog/connector-elasticsearch.md’;

Elasticsearch

Description

Output data to Elasticsearch.

Key features

:::tip

Engine Supported

  • supported ElasticSearch version is >= 2.x and <= 8.x

:::

Options

nametyperequireddefault value
hostsarrayyes-
indexstringyes-
schema_save_modestringyesCREATE_SCHEMA_WHEN_NOT_EXIST
data_save_modestringyesAPPEND_DATA
index_typestringno
primary_keyslistno
key_delimiterstringno_
auth_typestringnobasic
usernamestringno
passwordstringno
auth.api_key_idstringno-
auth.api_keystringno-
auth.api_key_encodedstringno-

| max_retry_count | int | no | 3 | | max_batch_size | int | no | 10 | | tls_verify_certificate | boolean | no | true | | tls_verify_hostnames | boolean | no | true | | tls_keystore_path | string | no | - | | tls_keystore_password | string | no | - | | tls_truststore_path | string | no | - | | tls_truststore_password | string | no | - | | common-options | | no | - | | vectorization_fields | array | no | - | | vector_dimensions | int | no | - |

hosts [array]

Elasticsearch cluster http address, the format is host:port , allowing multiple hosts to be specified. Such as ["host1:9200", "host2:9200"].

index [string]

Elasticsearch index name.Index support contains variables of field name,such as seatunnel_${age}(Need to configure schema_save_mode=“IGNORE”),and the field must appear at seatunnel row. If not, we will treat it as a normal index.

index_type [string]

Elasticsearch index type, it is recommended not to specify in elasticsearch 6 and above

primary_keys [list]

Primary key fields used to generate the document _id, this is cdc required options.

key_delimiter [string]

Delimiter for composite keys (“_” by default), e.g., “$” would result in document _id “KEY1$KEY2$KEY3”.

Authentication

The Elasticsearch connector supports multiple authentication methods to connect to secured Elasticsearch clusters. You can choose the appropriate authentication method based on your Elasticsearch security configuration.

auth_type [enum]

Specifies the authentication method to use. Supported values:

  • basic (default): HTTP Basic Authentication using username and password
  • api_key: Elasticsearch API Key authentication using separate ID and key
  • api_key_encoded: Elasticsearch API Key authentication using encoded key

If not specified, defaults to basic for backward compatibility.

Basic Authentication

Basic authentication uses HTTP Basic Authentication with username and password credentials.

username [string]

Username for basic authentication (x-pack username).

password [string]

Password for basic authentication (x-pack password).

Example:

sink {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        auth_type = "basic"
        username = "elastic"
        password = "your_password"
        index = "my_index"
    }
}

API Key Authentication

API Key authentication provides a more secure way to authenticate with Elasticsearch using API keys.

auth.api_key_id [string]

The API key ID generated by Elasticsearch.

auth.api_key [string]

The API key secret generated by Elasticsearch.

auth.api_key_encoded [string]

Base64 encoded API key in the format base64(id:api_key). This is an alternative to specifying auth.api_key_id and auth.api_key separately.

Note: You can use either auth.api_key_id + auth.api_key OR auth.api_key_encoded, but not both.

Example with separate ID and key:

sink {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        auth_type = "api_key"
        auth.api_key_id = "your_api_key_id"
        auth.api_key = "your_api_key_secret"
        index = "my_index"
    }
}

Example with encoded key:

sink {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        auth_type = "api_key_encoded"
        auth.api_key_encoded = "eW91cl9hcGlfa2V5X2lkOnlvdXJfYXBpX2tleV9zZWNyZXQ="
        index = "my_index"
    }
}

max_retry_count [int]

one bulk request max try size

vectorization_fields [array]

fields to embeddings

vector_dimensions [int]

embeddings dimensions

max_batch_size [int]

batch bulk doc max size

tls_verify_certificate [boolean]

Enable certificates validation for HTTPS endpoints

tls_verify_hostname [boolean]

Enable hostname validation for HTTPS endpoints

tls_keystore_path [string]

The path to the PEM or JKS key store. This file must be readable by the operating system user running SeaTunnel.

tls_keystore_password [string]

The key password for the key store specified

tls_truststore_path [string]

The path to PEM or JKS trust store. This file must be readable by the operating system user running SeaTunnel.

tls_truststore_password [string]

The key password for the trust store specified

common options

Sink plugin common parameters, please refer to Sink Common Options for details

schema_save_mode

Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. Option introduction:
RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild when the table is saved
CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, skipped when the table is saved
ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not exist
IGNORE :Ignore the treatment of the table

data_save_mode

Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. Option introduction:
DROP_DATA: Preserve database structure and delete data
APPEND_DATA:Preserve database structure, preserve data
ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported

Examples

Simple

sink {
    Elasticsearch {
        hosts = ["localhost:9200"]
        index = "seatunnel-${age}"
        schema_save_mode="IGNORE"
    }
}

Multi-table writing

sink {
    Elasticsearch {
        hosts = ["localhost:9200"]
        index = "${table_name}"
        schema_save_mode="IGNORE"
    }
}

vector-field writing

sink {
    Elasticsearch {
        hosts = ["localhost:9200"]
        index = "${table_name}"
        schema_save_mode="IGNORE"
        vectorization_fields = ["review_embedding"]  
        vector_dimensions = 1024 
    }
}

CDC(Change data capture) event

sink {
    Elasticsearch {
        hosts = ["localhost:9200"]
        index = "seatunnel-${age}"
        schema_save_mode="IGNORE"
        # cdc required options
        primary_keys = ["key1", "key2", ...]
    }
}

CDC(Change data capture) event Multi-table writing

sink {
    Elasticsearch {
        hosts = ["localhost:9200"]
        index = "${table_name}"
        schema_save_mode="IGNORE"
        primary_keys = ["${primary_key}"]
    }
}

SSL (Disable certificates validation)

sink {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        username = "elastic"
        password = "elasticsearch"
        
        tls_verify_certificate = false
    }
}

SSL (Disable hostname validation)

sink {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        username = "elastic"
        password = "elasticsearch"
        
        tls_verify_hostname = false
    }
}

SSL (Enable certificates validation)

sink {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        username = "elastic"
        password = "elasticsearch"
        
        tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
        tls_keystore_password = "${your password}"
    }
}

SAVE_MODE

sink {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        username = "elastic"
        password = "elasticsearch"
        
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        data_save_mode = "APPEND_DATA"
    }
}

Schema Evolution

CDC collection supports a limited number of schema changes. The currently supported schema changes include:

  • Adding columns.

Schema Evolution

env {
  # You can set engine configuration here
  parallelism = 5
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  read_limit.bytes_per_second = 7000000
  read_limit.rows_per_second = 400
}

source {
  MySQL-CDC {
    server-id = 5652-5657
    username = "st_user_source"
    password = "mysqlpw"
    table-names = ["shop.products"]
    url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
    schema-changes.enabled = true
  }
}

sink {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false
    index = "schema_change_index"
    index_type = "_doc"
    "schema_save_mode" = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    "data_save_mode" = "APPEND_DATA"
  }
}

Changelog