blob: 5aeb5702a360e699e505b5382821f625c761a5e5 [file] [view]
---
{
"title": "Flink",
"language": "en",
"description": "How to use Flink Doris Connector to import Flink real-time data (Kafka, MySQL, etc.) into Doris? Includes complete steps for table creation, synchronization, and verification.",
"keywords": [
"Flink import to Doris",
"Flink Doris Connector",
"Flink CDC sync to Doris",
"FlinkSQL write to Doris",
"real-time data import to Doris",
"Kafka MySQL sync to Doris"
]
}
---
<!-- Knowledge type: Operational steps -->
<!-- Applicable scenarios: Real-time data import / Streaming ETL -->
The **Flink Doris Connector** lets you import data produced by Flink (such as data Flink reads from Kafka or MySQL) into Doris in real time. It is suitable for real-time data ingestion and streaming ETL scenarios.
## Applicable scenarios
| Scenario | Description |
| --- | --- |
| Real-time data ingestion | Write data from message queues such as Kafka or Pulsar into Doris in real time |
| Database synchronization | Synchronize data from databases such as MySQL or Oracle to Doris through Flink CDC |
| Streaming ETL | Perform real-time computation with Flink and write the results to Doris |
## Limitations
- Requires a **Flink cluster** that you have already deployed.
- Requires the corresponding version of the **Flink Doris Connector** to be deployed in Flink.
## Procedure
For complete instructions on importing data with Flink, see [Flink-Doris-Connector](../../../connection-integration/data-integration/flink-doris-connector.md). The following minimal example shows how to quickly complete an import through Flink.
The overall workflow consists of the following three steps:
1. Create the target table in Doris
2. Write data through FlinkSQL in Flink
3. Verify in Doris that the data has been imported successfully
### Step 1: Create a table in Doris
Create the target table `students` in Doris to receive data from Flink:
```sql
CREATE TABLE `students` (
`id` INT NULL,
`name` VARCHAR(256) NULL,
`age` INT NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
```
### Step 2: Import data with Flink
Run `bin/sql-client.sh` to open the FlinkSQL console, and execute the following statements to create a sink table and write data:
```sql
CREATE TABLE student_sink (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '10.16.10.6:28737',
'table.identifier' = 'test.students',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label'
);
INSERT INTO student_sink values(1,'zhangsan',123)
```
The key parameters are described below:
| Parameter | Description |
| --- | --- |
| `connector` | Fixed as `doris`, indicating that the Flink Doris Connector is used |
| `fenodes` | The HTTP address of the Doris FE, in the format `host:http_port` |
| `table.identifier` | The identifier of the target table, in the format `database.table` |
| `username` | The Doris login username |
| `password` | The Doris login password |
| `sink.label-prefix` | The Label prefix for the Stream Load import task. It must be globally unique |
### Step 3: Check the imported data
Query the target table in Doris to confirm that the data has been imported successfully:
```sql
select * from test.students;
+------+----------+------+
| id | name | age |
+------+----------+------+
| 1 | zhangsan | 123 |
+------+----------+------+
```
## FAQ
**Q1: Which data sources does the Flink Doris Connector support?**
In theory, all data sources supported by Flink (Kafka, MySQL CDC, file systems, message queues, etc.) can be used as upstream sources, processed by Flink, and then written to Doris.
**Q2: Why does `sink.label-prefix` need to be unique?**
The Flink Doris Connector implements imports based on Doris Stream Load. Each transaction requires a unique Label to guarantee Exactly-Once semantics. Duplicate Labels cause import conflicts.
**Q3: Which port should `fenodes` use?**
`fenodes` takes the **HTTP port** of the Doris FE (default `8030`), not the MySQL protocol port (default `9030`).
**Q4: How can I synchronize data from databases such as MySQL or Oracle to Doris?**
You can use it together with Flink CDC. For details, see the [Flink-Doris-Connector](../../../connection-integration/data-integration/flink-doris-connector.md) documentation.
## Related documents
- [Flink-Doris-Connector](../../../connection-integration/data-integration/flink-doris-connector.md)