blob: 4777e089391280f680bac0ad5c174ba6bf0ecd30 [file] [log] [blame]
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
# Kafka
> Kafka source connector
## Description
To consume data from `Kafka` , supported `Kafka version >= 0.10.0` .
:::tip
Engine Supported and plugin name
* [x] Spark: KafkaStream
* [x] Flink: Kafka
:::
## Options
<Tabs
groupId="engine-type"
defaultValue="spark"
values={[
{label: 'Spark', value: 'spark'},
{label: 'Flink', value: 'flink'},
]}>
<TabItem value="spark">
| name | type | required | default value |
| -------------------------- | ------ | -------- | ------------- |
| topics | string | yes | - |
| consumer.group.id | string | yes | - |
| consumer.bootstrap.servers | string | yes | - |
| consumer.* | string | no | - |
| common-options | string | yes | - |
</TabItem>
<TabItem value="flink">
| name | type | required | default value |
| -------------------------- | ------ | -------- | ------------- |
| topics | string | yes | - |
| consumer.group.id | string | yes | - |
| consumer.bootstrap.servers | string | yes | - |
| schema | string | yes | - |
| format.type | string | yes | - |
| format.* | string | no | - |
| consumer.* | string | no | - |
| rowtime.field | string | no | - |
| watermark | long | no | - |
| offset.reset | string | no | - |
| common-options | string | no | - |
</TabItem>
</Tabs>
### topics [string]
`Kafka topic` name. If there are multiple `topics`, use `,` to split, for example: `"tpc1,tpc2"`
### consumer.group.id [string]
`Kafka consumer group id`, used to distinguish different consumer groups
### consumer.bootstrap.servers [string]
`Kafka` cluster address, separated by `,`
<Tabs
groupId="engine-type"
defaultValue="spark"
values={[
{label: 'Spark', value: 'spark'},
{label: 'Flink', value: 'flink'},
]}>
<TabItem value="spark">
</TabItem>
<TabItem value="flink">
### format.type [string]
Currently supports three formats
- json
- csv
- avro
### format.* [string]
The `csv` format uses this parameter to set the separator and so on. For example, set the column delimiter to `\t` , `format.field-delimiter=\\t`
### schema [string]
- csv
- The `schema` of `csv` is a string of `jsonArray` , such as `"[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\ ",\"type\":\"int\"}]"` .
- json
- The `schema` parameter of `json` is to provide a `json string` of the original data, and the `schema` can be automatically generated, but the original data with the most complete content needs to be provided, otherwise the fields will be lost.
- avro
- The `schema` parameter of `avro` is to provide a standard `avro schema JSON string` , such as `{\"name\":\"test\",\"type\":\"record\",\"fields\":[{ \"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"long\"} ,{\"name\":\"addrs\",\"type\":{\"name\":\"addrs\",\"type\":\"record\",\"fields\" :[{\"name\":\"province\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string \"}]}}]}`
- To learn more about how the `Avro Schema JSON string` should be defined, please refer to: https://avro.apache.org/docs/current/spec.html
### rowtime.field [string]
Extract timestamp using current configuration field for flink event time watermark
### watermark [long]
Sets a built-in watermark strategy for rowtime.field attributes which are out-of-order by a bounded time
interval. Emits watermarks which are the maximum observed timestamp minus the specified delay.
### offset.reset [string]
The consumer's initial `offset` is only valid for new consumers. There are three modes
- latest
- Start consumption from the latest offset
- earliest
- Start consumption from the earliest offset
- specific
- Start consumption from the specified `offset` , and specify the `start offset` of each partition at this time. The setting method is through `offset.reset.specific="{0:111,1:123}"`
</TabItem>
</Tabs>
### consumer.* [string]
In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs).
The way to specify parameters is to add the prefix `consumer.` to the original parameter name. For example, the way to specify `auto.offset.reset` is: `consumer.auto.offset.reset = latest` . If these non-essential parameters are not specified, they will use the default values given in the official Kafka documentation.
### common options [string]
Source plugin common parameters, please refer to [Source Plugin](common-options.mdx) for details
## Examples
<Tabs
groupId="engine-type"
defaultValue="spark"
values={[
{label: 'Spark', value: 'spark'},
{label: 'Flink', value: 'flink'},
]}>
<TabItem value="spark">
```bash
kafkaStream {
topics = "seatunnel"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_group"
}
```
</TabItem>
<TabItem value="flink">
```bash
KafkaTableStream {
consumer.bootstrap.servers = "127.0.0.1:9092"
consumer.group.id = "seatunnel5"
topics = test
result_table_name = test
format.type = csv
schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
format.field-delimiter = ";"
format.allow-comments = "true"
format.ignore-parse-errors = "true"
}
```
</TabItem>
</Tabs>