blob: 2583c205fc77c85de15e039eb19ef3a6a3c45638 [file] [log] [blame] [view]
---
id: io-canal-source
title: Canal source connector
sidebar_label: "Canal source connector"
---
:::note
You can download all the Pulsar connectors on [download page](pathname:///download).
:::
The Canal source connector pulls messages from MySQL to Pulsar topics.
## Configuration
The configuration of the Canal source connector has the following properties.
### Property
| Name | Required | Default | Description |
|------|----------|---------|-------------|
| `username` | true | None | Canal server account (not MySQL).|
| `password` | true | None | Canal server password (not MySQL). |
|`destination`|true|None|Source destination that Canal source connector connects to.
| `singleHostname` | false | None | Canal server address.|
| `singlePort` | false | None | Canal server port.|
| `cluster` | true | false | Whether to enable cluster mode based on Canal server configuration or not.<br /><br /><li>true: **cluster** mode.<br />If set to true, it talks to `zkServers` to figure out the actual database host.<br /><br /></li><li>false: **standalone** mode.<br />If set to false, it connects to the database specified by `singleHostname` and `singlePort`. </li>|
| `zkServers` | true | None | Address and port of the Zookeeper that Canal source connector talks to figure out the actual database host.|
| `batchSize` | false | 1000 | Batch size to fetch from Canal. |
### Example
Before using the Canal connector, you can create a configuration file through one of the following methods.
* JSON
```json
{
"zkServers": "127.0.0.1:2181",
"batchSize": "5120",
"destination": "example",
"username": "",
"password": "",
"cluster": false,
"singleHostname": "127.0.0.1",
"singlePort": "11111",
}
```
* YAML
You can create a YAML file and copy the [contents](https://github.com/apache/pulsar/blob/master/pulsar-io/canal/src/main/resources/canal-mysql-source-config.yaml) below to your YAML file.
```yaml
configs:
zkServers: "127.0.0.1:2181"
batchSize: 5120
destination: "example"
username: ""
password: ""
cluster: false
singleHostname: "127.0.0.1"
singlePort: 11111
```
## Usage
Here is an example of storing MySQL data using the configuration file as above.
1. Start a MySQL server.
```bash
docker pull mysql:5.7
docker run -d -it --rm --name pulsar-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=canal -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw mysql:5.7
```
2. Create a configuration file `mysqld.cnf`.
```properties
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
#log-error = /var/log/mysql/error.log
# By default we only accept connections from localhost
#bind-address = 127.0.0.1
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
log-bin=mysql-bin
binlog-format=ROW
server_id=1
```
3. Copy the configuration file `mysqld.cnf` to MySQL server.
```bash
docker cp mysqld.cnf pulsar-mysql:/etc/mysql/mysql.conf.d/
```
4. Restart the MySQL server.
```bash
docker restart pulsar-mysql
```
5. Create a test database in MySQL server.
```bash
docker exec -it pulsar-mysql /bin/bash
mysql -h 127.0.0.1 -uroot -pcanal -e 'create database test;'
```
6. Start a Canal server and connect to MySQL server.
```bash
docker pull canal/canal-server:v1.1.2
docker run -d -it --link pulsar-mysql -e canal.auto.scan=false -e canal.destinations=test -e canal.instance.master.address=pulsar-mysql:3306 -e canal.instance.dbUsername=root -e canal.instance.dbPassword=canal -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false --name=pulsar-canal-server -p 8000:8000 -p 2222:2222 -p 11111:11111 -p 11112:11112 -m 4096m canal/canal-server:v1.1.2
```
7. Start Pulsar standalone.
```bash
docker pull apachepulsar/pulsar:@pulsar:version@
docker run --user 0 -d -it --link pulsar-canal-server -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone apachepulsar/pulsar:@pulsar:version@ bin/pulsar standalone
```
8. Modify the configuration file `canal-mysql-source-config.yaml`.
```yaml
configs:
zkServers: ""
batchSize: "5120"
destination: "test"
username: ""
password: ""
cluster: false
singleHostname: "pulsar-canal-server"
singlePort: "11111"
```
9. Create a consumer file `pulsar-client.py`.
```python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic',
subscription_name='my-sub')
while True:
msg = consumer.receive()
print("Received message: '%s'" % msg.data())
consumer.acknowledge(msg)
client.close()
```
10. Copy the configuration file `canal-mysql-source-config.yaml` and the consumer file `pulsar-client.py` to Pulsar server.
```bash
docker cp canal-mysql-source-config.yaml pulsar-standalone:/pulsar/conf/
docker cp pulsar-client.py pulsar-standalone:/pulsar/
```
11. Download a Canal connector and start it.
```bash
docker exec -it pulsar-standalone /bin/bash
curl -LO --output-dir connectors "https://www.apache.org/dyn/closer.lua/pulsar/pulsar-@pulsar:version@/connectors/pulsar-io-canal-@pulsar:version@.nar?action=download"
./bin/pulsar-admin source localrun \
--archive $PWD/connectors/pulsar-io-canal-@pulsar:version@.nar \
--classname org.apache.pulsar.io.canal.CanalStringSource \
--tenant public \
--namespace default \
--name canal \
--destination-topic-name my-topic \
--source-config-file /pulsar/conf/canal-mysql-source-config.yaml \
--parallelism 1
```
12. Consume data from MySQL.
```bash
docker exec -it pulsar-standalone /bin/bash
python pulsar-client.py
```
13. Open another window to log in MySQL server.
```bash
docker exec -it pulsar-mysql /bin/bash
mysql -h 127.0.0.1 -uroot -pcanal
```
14. Create a table, and insert, delete, and update data in MySQL server.
```bash
mysql> use test;
mysql> show tables;
mysql> CREATE TABLE IF NOT EXISTS `test_table`(`test_id` INT UNSIGNED AUTO_INCREMENT,`test_title` VARCHAR(100) NOT NULL,
`test_author` VARCHAR(40) NOT NULL,
`test_date` DATE,PRIMARY KEY ( `test_id` ))ENGINE=InnoDB DEFAULT CHARSET=utf8;
mysql> INSERT INTO test_table (test_title, test_author, test_date) VALUES("a", "b", NOW());
mysql> UPDATE test_table SET test_title='c' WHERE test_title='a';
mysql> DELETE FROM test_table WHERE test_title='c';
```