import ChangeLog from ‘../changelog/connector-kafka.md’;
Kafka 源连接器
Spark
Flink
Seatunnel Zeta
用于 Apache Kafka 的源连接器。
使用 Kafka 连接器需要以下依赖项。
可以通过 install-plugin.sh 下载或从 Maven 中央仓库获取。
数据源 | 支持的版本 | Maven 下载链接 |
---|---|---|
Kafka | 通用版本 | 下载 |
名称 | 类型 | 是否必填 | 默认值 | 描述 |
---|---|---|---|---|
topic | String | 是 | - | 使用表作为数据源时要读取数据的主题名称。它也支持通过逗号分隔的多个主题列表,例如 ‘topic-1,topic-2’。 |
table_list | Map | 否 | - | 主题列表配置,你可以同时配置一个 table_list 和一个 topic 。 |
bootstrap.servers | String | 是 | - | 逗号分隔的 Kafka brokers 列表。 |
pattern | Boolean | 否 | false | 如果 pattern 设置为 true ,则会使用指定的正则表达式匹配并订阅主题。 |
consumer.group | String | 否 | SeaTunnel-Consumer-Group | Kafka 消费者组 ID ,用于区分不同的消费者组。 |
commit_on_checkpoint | Boolean | 否 | true | 如果为 true,消费者的偏移量将会定期在后台提交。 |
poll.timeout | Long | 否 | 10000 | kafka主动拉取时间间隔(毫秒)。 |
kafka.config | Map | 否 | - | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 Kafka 官方文档 中指定的所有消费者参数。 |
schema | Config | 否 | - | 数据结构,包括字段名称和字段类型。 |
format | String | 否 | json | 数据格式。默认格式为 json。可选格式包括 text, canal_json, debezium_json, ogg_json, maxwell_json, avro , protobuf和native。默认字段分隔符为 ", "。如果自定义分隔符,添加 “field_delimiter” 选项。如果使用 canal 格式,请参考 canal-json 了解详细信息。如果使用 debezium 格式,请参考 debezium-json。一些Format的详细信息请参考 formats |
format_error_handle_way | String | 否 | fail | 数据格式错误的处理方式。默认值为 fail,可选值为 fail 和 skip。当选择 fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。 |
debezium_record_table_filter | Config | 否 | - | 用于过滤 debezium 格式的数据,仅当格式设置为 debezium_json 时使用。请参阅下面的 debezium_record_table_filter |
field_delimiter | String | 否 | , | 自定义数据格式的字段分隔符。 |
start_mode | StartMode[earliest],[group_offsets] | 否 | group_offsets | 消费者的初始消费模式。 |
start_mode.offsets | Config | 否 | - | 用于 specific_offsets 消费模式的偏移量。 |
start_mode.timestamp | Long | 否 | - | 用于 “timestamp” 消费模式的时间。 |
start_mode.end_timestamp | Long | 否 | - | 用于 “timestamp” 消费模式的结束时间,只支持批模式 |
partition-discovery.interval-millis | Long | 否 | -1 | 动态发现主题和分区的间隔时间。 |
ignore_no_leader_partition | Boolean | 否 | false | 是否忽略没有 leader 的分区。如果设置为 true,在分区发现过程中将跳过没有 leader 的分区。如果设置为 false(默认值),连接器将包含所有分区,无论 leader 状态如何。这在处理可能存在临时 leader 问题的 Kafka 集群时很有用。 |
common-options | 否 | - | 源插件的常见参数,详情请参考 Source Common Options。 | |
protobuf_message_name | String | 否 | - | 当格式设置为 protobuf 时有效,指定消息名称。 |
protobuf_schema | String | 否 | - | 当格式设置为 protobuf 时有效,指定 Schema 定义。 |
reader_cache_queue_size | Integer | 否 | 1024 | Reader分片缓存队列,用于缓存分片对应的数据。占用大小取决于每个reader得到的分片量,而不是每个分片的数据量。 |
is_native | Boolean | No | false | 支持保留record的源信息。 |
我们可以使用 debezium_record_table_filter
来过滤 debezium 格式的数据。配置如下:
debezium_record_table_filter { database_name = "test" schema_name = "public" // null 如果不存在 table_name = "products" }
只有 test.public.products
表的数据将被消费。
此示例读取 Kafka 的 topic_1、topic_2 和 topic_3 的数据并将其打印到客户端。如果尚未安装和部署 SeaTunnel,请按照 安装指南 进行安装和部署。然后,按照 快速开始 运行此任务。
# 定义运行环境 env { parallelism = 2 job.mode = "BATCH" } source { Kafka { schema = { fields { name = "string" age = "int" } } format = text field_delimiter = "#" topic = "topic_1,topic_2,topic_3" bootstrap.servers = "localhost:9092" kafka.config = { client.id = client_1 max.poll.records = 500 auto.offset.reset = "earliest" enable.auto.commit = "false" } } } sink { Console {} }
source { Kafka { topic = ".*seatunnel*." pattern = "true" bootstrap.servers = "localhost:9092" consumer.group = "seatunnel_group" } }
将以下 ${username}
和 ${password}
替换为 AWS MSK 中的配置值。
source { Kafka { topic = "seatunnel" bootstrap.servers = "xx.amazonaws.com.cn:9096,xxx.amazonaws.com.cn:9096,xxxx.amazonaws.com.cn:9096" consumer.group = "seatunnel_group" kafka.config = { security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";" } } }
从 此处 下载 aws-msk-iam-auth-1.1.5.jar
并将其放在 $SEATUNNEL_HOME/plugin/kafka/lib
目录下。
确保 IAM 策略中包含 "kafka-cluster:Connect"
权限,如下所示:
"Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster" ],
源配置示例:
source { Kafka { topic = "seatunnel" bootstrap.servers = "xx.amazonaws.com.cn:9098,xxx.amazonaws.com.cn:9098,xxxx.amazonaws.com.cn:9098" consumer.group = "seatunnel_group" kafka.config = { security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" } } }
请在启动 SeaTunnel 之前设置 JVM 参数 java.security.krb5.conf
或更新 /etc/krb5.conf
中的默认 krb5.conf
。
源配置示例:
source { Kafka { topic = "seatunnel" bootstrap.servers = "127.0.0.1:9092" consumer.group = "seatunnel_group" kafka.config = { security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka sasl.mechanism=GSSAPI sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required \n useKeyTab=true \n storeKey=true \n keyTab=\"/path/to/xxx.keytab\" \n principal=\"user@xxx.com\";" } } }
根据不同的 Kafka 主题和格式解析数据,并基于 ID 执行 upsert 操作。
注意: Kafka是一个非结构化数据源,应该使用
tables_configs
,将来会删除table_list
env { execution.parallelism = 1 job.mode = "BATCH" } source { Kafka { bootstrap.servers = "kafka_e2e:9092" tables_configs = [ { topic = "^test-ogg-sou.*" pattern = "true" consumer.group = "ogg_multi_group" start_mode = earliest schema = { fields { id = "int" name = "string" description = "string" weight = "string" } }, format = ogg_json }, { topic = "test-cdc_mds" start_mode = earliest schema = { fields { id = "int" name = "string" description = "string" weight = "string" } }, format = canal_json } ] } } sink { Jdbc { driver = org.postgresql.Driver url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" user = test password = test generate_sink_sql = true database = test table = public.sink primary_keys = ["id"] } }
env { execution.parallelism = 1 job.mode = "BATCH" } source { Kafka { bootstrap.servers = "kafka_e2e:9092" table_list = [ { topic = "^test-ogg-sou.*" pattern = "true" consumer.group = "ogg_multi_group" start_mode = earliest schema = { fields { id = "int" name = "string" description = "string" weight = "string" } }, format = ogg_json }, { topic = "test-cdc_mds" start_mode = earliest schema = { fields { id = "int" name = "string" description = "string" weight = "string" } }, format = canal_json } ] } } sink { Jdbc { driver = org.postgresql.Driver url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" user = test password = test generate_sink_sql = true database = test table = public.sink primary_keys = ["id"] } }
format
设置为 protobuf
,配置protobuf
数据结构,protobuf_message_name
和protobuf_schema
参数
使用样例:
source { Kafka { topic = "test_protobuf_topic_fake_source" format = protobuf protobuf_message_name = Person protobuf_schema = """ syntax = "proto3"; package org.apache.seatunnel.format.protobuf; option java_outer_classname = "ProtobufE2E"; message Person { int32 c_int32 = 1; int64 c_int64 = 2; float c_float = 3; double c_double = 4; bool c_bool = 5; string c_string = 6; bytes c_bytes = 7; message Address { string street = 1; string city = 2; string state = 3; string zip = 4; } Address address = 8; map<string, float> attributes = 9; repeated string phone_numbers = 10; } """ bootstrap.servers = "kafkaCluster:9092" start_mode = "earliest" plugin_output = "kafka_table" } }
当处理可能存在临时 leader 问题的 Kafka 集群时,您可以配置连接器忽略没有 leader 的分区:
source { Kafka { topic = "test_topic" bootstrap.servers = "localhost:9092" consumer.group = "test_group" ignore_no_leader_partition = true start_mode = "earliest" } }
当 ignore_no_leader_partition = true
时,连接器将在分区发现过程中跳过任何没有 leader 的分区,允许作业继续处理其他健康的分区。
如果需要保留Kafka原生的信息,可以参考如下配置。
配置示例:
source { Kafka { topic = "test_topic_native_source" bootstrap.servers = "kafkaCluster:9092" start_mode = "earliest" format_error_handle_way = skip format = "NATIVE" value_converter_schema_enabled = false consumer.group = "native_group" } }
返回数据格式如下:
{ "headers": { "header1": "header1", "header2": "header2" }, "key": "dGVzdF9ieXRlc19kYXRh", "partition": 3, "timestamp": 1672531200000, "timestampType": "CREATE_TIME", "value": "dGVzdF9ieXRlc19kYXRh" }
注意:key/value是byte[]类型。