blob: 643b584cc3e8044a90a5d93389c852d392570487 [file] [log] [blame] [view]
import ChangeLog from '../changelog/connector-kafka.md';
# Kafka
> Kafka 源连接器
## 支持以下引擎
> Spark<br/>
> Flink<br/>
> Seatunnel Zeta<br/>
## 主要功能
- [x] [批处理](../../concept/connector-v2-features.md)
- [x] [流处理](../../concept/connector-v2-features.md)
- [x] [精确一次](../../concept/connector-v2-features.md)
- [ ] [列投影](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)
- [ ] [支持用户定义拆分](../../concept/connector-v2-features.md)
## 描述
用于 Apache Kafka 的源连接器。
## 支持的数据源信息
使用 Kafka 连接器需要以下依赖项。
可以通过 install-plugin.sh 下载或从 Maven 中央仓库获取。
| 数据源 | 支持的版本 | Maven 下载链接 |
|-------|-------|-------------------------------------------------------------------------------|
| Kafka | 通用版本 | [下载](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-kafka) |
## 源选项
| 名称 | 类型 | 是否必填 | 默认值 | 描述 |
|-------------------------------------|-------------------------------------|------|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| topic | String | | - | 使用表作为数据源时要读取数据的主题名称。它也支持通过逗号分隔的多个主题列表,例如 'topic-1,topic-2' |
| table_list | Map | | - | 主题列表配置,你可以同时配置一个 `table_list` 和一个 `topic` |
| bootstrap.servers | String | | - | 逗号分隔的 Kafka brokers 列表。 |
| pattern | Boolean | | false | 如果 `pattern` 设置为 `true`,则会使用指定的正则表达式匹配并订阅主题。 |
| consumer.group | String | | SeaTunnel-Consumer-Group | `Kafka 消费者组 ID`,用于区分不同的消费者组。 |
| commit_on_checkpoint | Boolean | | true | 如果为 true,消费者的偏移量将会定期在后台提交。 |
| poll.timeout | Long | | 10000 | kafka主动拉取时间间隔(毫秒)。 |
| kafka.config | Map | | - | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka 官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。 |
| schema | Config | | - | 数据结构,包括字段名称和字段类型。 |
| format | String | | json | 数据格式。默认格式为 json。可选格式包括 text, canal_json, debezium_json, ogg_json, maxwell_json, avro , protobufnative。默认字段分隔符为 ", "。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 [canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考 [debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考 [formats](../formats) |
| format_error_handle_way | String | | fail | 数据格式错误的处理方式。默认值为 fail,可选值为 fail skip。当选择 fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。 |
| debezium_record_table_filter | Config | | - | 用于过滤 debezium 格式的数据,仅当格式设置为 `debezium_json` 时使用。请参阅下面的 `debezium_record_table_filter` |
| field_delimiter | String | | , | 自定义数据格式的字段分隔符。 |
| start_mode | StartMode[earliest],[group_offsets] | | group_offsets | 消费者的初始消费模式。 |
| start_mode.offsets | Config | | - | 用于 specific_offsets 消费模式的偏移量。 |
| start_mode.timestamp | Long | | - | 用于 "timestamp" 消费模式的时间。 |
| start_mode.end_timestamp | Long | | - | 用于 "timestamp" 消费模式的结束时间,只支持批模式 |
| partition-discovery.interval-millis | Long | | -1 | 动态发现主题和分区的间隔时间。 |
| ignore_no_leader_partition | Boolean | | false | 是否忽略没有 leader 的分区。如果设置为 true,在分区发现过程中将跳过没有 leader 的分区。如果设置为 false(默认值),连接器将包含所有分区,无论 leader 状态如何。这在处理可能存在临时 leader 问题的 Kafka 集群时很有用。 |
| common-options | | | - | 源插件的常见参数,详情请参考 [Source Common Options](../source-common-options.md)。 |
| protobuf_message_name | String | | - | 当格式设置为 protobuf 时有效,指定消息名称。 |
| protobuf_schema | String | | - | 当格式设置为 protobuf 时有效,指定 Schema 定义。 |
| reader_cache_queue_size | Integer | | 1024 | Reader分片缓存队列,用于缓存分片对应的数据。占用大小取决于每个reader得到的分片量,而不是每个分片的数据量。 |
| is_native | Boolean | No | false | 支持保留record的源信息。 |
### debezium_record_table_filter
我们可以使用 `debezium_record_table_filter` 来过滤 debezium 格式的数据。配置如下:
```hocon
debezium_record_table_filter {
database_name = "test"
schema_name = "public" // null 如果不存在
table_name = "products"
}
```
只有 `test.public.products` 表的数据将被消费。
## 任务示例
### 简单示例
> 此示例读取 Kafka topic_1topic_2 topic_3 的数据并将其打印到客户端。如果尚未安装和部署 SeaTunnel,请按照 [安装指南](../../start-v2/locally/deployment.md) 进行安装和部署。然后,按照 [快速开始](../../start-v2/locally/quick-start-seatunnel-engine.md) 运行此任务。
```hocon
# 定义运行环境
env {
parallelism = 2
job.mode = "BATCH"
}
source {
Kafka {
schema = {
fields {
name = "string"
age = "int"
}
}
format = text
field_delimiter = "#"
topic = "topic_1,topic_2,topic_3"
bootstrap.servers = "localhost:9092"
kafka.config = {
client.id = client_1
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
}
}
sink {
Console {}
}
```
### 正则表达式主题
```hocon
source {
Kafka {
topic = ".*seatunnel*."
pattern = "true"
bootstrap.servers = "localhost:9092"
consumer.group = "seatunnel_group"
}
}
```
### AWS MSK SASL/SCRAM
将以下 `${username}` `${password}` 替换为 AWS MSK 中的配置值。
```hocon
source {
Kafka {
topic = "seatunnel"
bootstrap.servers = "xx.amazonaws.com.cn:9096,xxx.amazonaws.com.cn:9096,xxxx.amazonaws.com.cn:9096"
consumer.group = "seatunnel_group"
kafka.config = {
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";"
}
}
}
```
### AWS MSK IAM
[此处](https://github.com/aws/aws-msk-iam-auth/releases) 下载 `aws-msk-iam-auth-1.1.5.jar` 并将其放在 `$SEATUNNEL_HOME/plugin/kafka/lib` 目录下。
确保 IAM 策略中包含 `"kafka-cluster:Connect"` 权限,如下所示:
```hocon
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
```
源配置示例:
```hocon
source {
Kafka {
topic = "seatunnel"
bootstrap.servers = "xx.amazonaws.com.cn:9098,xxx.amazonaws.com.cn:9098,xxxx.amazonaws.com.cn:9098"
consumer.group = "seatunnel_group"
kafka.config = {
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
}
}
```
### Kerberos 认证示例
请在启动 SeaTunnel 之前设置 JVM 参数 `java.security.krb5.conf` 或更新 `/etc/krb5.conf` 中的默认 `krb5.conf`
源配置示例:
```hocon
source {
Kafka {
topic = "seatunnel"
bootstrap.servers = "127.0.0.1:9092"
consumer.group = "seatunnel_group"
kafka.config = {
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI
sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required \n useKeyTab=true \n storeKey=true \n keyTab=\"/path/to/xxx.keytab\" \n principal=\"user@xxx.com\";"
}
}
}
```
### 多 Kafka 源示例
> 根据不同的 Kafka 主题和格式解析数据,并基于 ID 执行 upsert 操作。
> 注意: Kafka是一个非结构化数据源,应该使用`tables_configs`,将来会删除`table_list`
```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
Kafka {
bootstrap.servers = "kafka_e2e:9092"
tables_configs = [
{
topic = "^test-ogg-sou.*"
pattern = "true"
consumer.group = "ogg_multi_group"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = ogg_json
},
{
topic = "test-cdc_mds"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = canal_json
}
]
}
}
sink {
Jdbc {
driver = org.postgresql.Driver
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
generate_sink_sql = true
database = test
table = public.sink
primary_keys = ["id"]
}
}
```
```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
Kafka {
bootstrap.servers = "kafka_e2e:9092"
table_list = [
{
topic = "^test-ogg-sou.*"
pattern = "true"
consumer.group = "ogg_multi_group"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = ogg_json
},
{
topic = "test-cdc_mds"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = canal_json
}
]
}
}
sink {
Jdbc {
driver = org.postgresql.Driver
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
generate_sink_sql = true
database = test
table = public.sink
primary_keys = ["id"]
}
}
```
### Protobuf配置
`format` 设置为 `protobuf`,配置`protobuf`数据结构,`protobuf_message_name``protobuf_schema`参数
使用样例:
```hocon
source {
Kafka {
topic = "test_protobuf_topic_fake_source"
format = protobuf
protobuf_message_name = Person
protobuf_schema = """
syntax = "proto3";
package org.apache.seatunnel.format.protobuf;
option java_outer_classname = "ProtobufE2E";
message Person {
int32 c_int32 = 1;
int64 c_int64 = 2;
float c_float = 3;
double c_double = 4;
bool c_bool = 5;
string c_string = 6;
bytes c_bytes = 7;
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}
Address address = 8;
map<string, float> attributes = 9;
repeated string phone_numbers = 10;
}
"""
bootstrap.servers = "kafkaCluster:9092"
start_mode = "earliest"
plugin_output = "kafka_table"
}
}
```
### 忽略无 Leader 分区
当处理可能存在临时 leader 问题的 Kafka 集群时,您可以配置连接器忽略没有 leader 的分区:
```hocon
source {
Kafka {
topic = "test_topic"
bootstrap.servers = "localhost:9092"
consumer.group = "test_group"
ignore_no_leader_partition = true
start_mode = "earliest"
}
}
```
`ignore_no_leader_partition = true` 时,连接器将在分区发现过程中跳过任何没有 leader 的分区,允许作业继续处理其他健康的分区。
### format
如果需要保留Kafka原生的信息,可以参考如下配置。
配置示例:
```hocon
source {
Kafka {
topic = "test_topic_native_source"
bootstrap.servers = "kafkaCluster:9092"
start_mode = "earliest"
format_error_handle_way = skip
format = "NATIVE"
value_converter_schema_enabled = false
consumer.group = "native_group"
}
}
```
返回数据格式如下:
```json
{
"headers": {
"header1": "header1",
"header2": "header2"
},
"key": "dGVzdF9ieXRlc19kYXRh",
"partition": 3,
"timestamp": 1672531200000,
"timestampType": "CREATE_TIME",
"value": "dGVzdF9ieXRlc19kYXRh"
}
```
注意:key/valuebyte[]类型。
## 变更日志
<ChangeLog />