title: Elasticsearch sidebar_position: 4

import {siteVariables} from ‘../../version’;

概览

Elasticsearch Load 节点允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch Load 节点。

连接器可以工作在 Upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。

如果 DDL 中没有定义主键,那么连接器只能工作在 Append 模式,只能与外部系统交换 INSERT 消息。

支持的版本

Load 节点版本
elasticsearchElasticsearch: 5.x, 6.x, 7.x

依赖

为了设置 Elasticsearch Load 节点,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connectors JAR 包的 SQL 客户端的两个项目的依赖关系信息。

  • Elasticsearch 6
  • Elasticsearch 7

如何创建一个 Elasticsearch Load 节点

SQL API 用法

下面列子展示了如何利用Flink SQL创建一个 Elasticsearch Load 节点:

CREATE TABLE myUserTable (
  user_id STRING,
  user_name STRING,
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'users'
);

InLong Dashboard 用法

TODO: 将在未来支持这个特性。

InLong Manager Client 用法

TODO: 将在未来支持这个特性。

Elasticsearch Load 节点参数

特性

Key 处理

Elasticsearch Load 节点可以根据是否定义了主键来确定是在 Upsert 模式还是 Append 模式下工作。 如果定义了主键,Elasticsearch Load 节点将以 Upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 消息的查询。 如果未定义主键,Elasticsearch Load 节点将以 Append 模式工作,该模式只能消费包含 INSERT 消息的查询。

在 Elasticsearch Load 节点中,主键用于计算 Elasticsearch 的文档 Id,文档 Id 为最多 512 字节且不包含空格的字符串。 Elasticsearch Load 节点通过使用 document-id.key-delimiter 指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 Id 字符串。 某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTESROWARRAYMAP 等。 如果未指定主键,Elasticsearch 将自动生成文档 Id。

有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL

动态索引

Elasticsearch Load 节点同时支持静态索引和动态索引。

如果你想使用静态索引,则 index 选项值应为纯字符串,例如 'myusers',所有记录都将被写入到 “myusers” 索引中。

如果你想使用动态索引,你可以使用 {field_name} 来引用记录中的字段值来动态生成目标索引。 你也可以使用 '{field_name|date_format_string}'TIMESTAMP/DATE/TIME 类型的字段值转换为 date_format_string 指定的格式。 date_format_string 与 Java 的 DateTimeFormatter 兼容。 例如,如果选项值设置为 'myusers-{'{log_ts|yyyy-MM-dd}'}',则 log_ts 字段值为 2020-03-27 12:25:55 的记录将被写入到 “myusers-2020-03-27” 索引中。

数据类型映射