Flink SQL Kafka Connector

Kafka connector based by flink sql


With kafka connector, we can read data from kafka and write data to kafka using Flink SQL. Refer to the Kafka connector for more details.


Let us have a brief example to show how to use the connector from end to end.

1. kafka prepare

Please refer to the Kafka QuickStart to prepare kafka environment and produce data like following:

$ bin/kafka-console-producer.sh --topic <topic-name> --bootstrap-server localhost:9092

After executing the command, we will come to the interactive mode. Print the following message to send data to kafka.


2. prepare seatunnel configuration

Here is a simple example of seatunnel configuration.

SET table.dml-sync = true;

    id INT,
    name STRING
) WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'

CREATE TABLE print_table (
    id INT,
    name STRING
) WITH (
    'connector' = 'print',
    'sink.parallelism' = '1'

INSERT INTO print_table SELECT * FROM events;

3. start flink local cluster

$ ${FLINK_HOME}/bin/start-cluster.sh

4. start Flink SQL job

Execute the following command in seatunnel home path to start the Flink SQL job.

$ bin/start-seatunnel-sql.sh -c config/kafka.sql.conf

5. verify result

After the job submitted, we can see the data printing by connector ‘print’ in taskmanager's log .

+I[1, abc]
+I[2, def]
+I[3, dfs]
+I[4, eret]
+I[5, yui]