| --- |
| id: io-kafka-source |
| title: Kafka source connector |
| sidebar_label: "Kafka source connector" |
| --- |
| |
| The Kafka source connector pulls messages from Kafka topics and persists the messages to Pulsar topics. |
| |
| This guide explains how to configure and use the Kafka source connector. |
| |
| ## Configuration |
| |
| The configuration of the Kafka source connector has the following properties. |
| |
| ### Property |
| |
| | Name | Type| Required | Default | Description |
| |------|----------|---------|-------------|-------------| |
| | `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. | |
| | `securityProtocol` |String| false | " " (empty string) | The protocol used to communicate with Kafka brokers. | |
| | `saslMechanism` |String| false | " " (empty string) | The SASL mechanism used for Kafka client connections. | |
| | `saslJaasConfig` |String| false | " " (empty string) | The JAAS login context parameters for SASL connections in the format used by JAAS configuration files. | |
| | `sslEnabledProtocols` |String| false | " " (empty string) | The list of protocols enabled for SSL connections. | |
| | `sslEndpointIdentificationAlgorithm` |String| false | " " (empty string) | The endpoint identification algorithm to validate server hostnames using a server certificate. | |
| | `sslTruststoreLocation` |String| false | " " (empty string) | The location of the trust store file. | |
| | `sslTruststorePassword` |String| false | " " (empty string) | The password of the trust store file. | |
| | `groupId` |String| true | " " (empty string) | A unique string that identifies the group of consumer processes to which this consumer belongs. | |
| | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch response. | |
| | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's offset is periodically committed in the background.<br /><br /> This committed offset is used when the process fails as the position from which a new consumer begins. | |
| | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. | |
| | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats to the consumer when using Kafka's group management facilities. <br /><br />**Note: `heartbeatIntervalMs` must be smaller than `sessionTimeoutMs`**.| |
| | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect consumer failures when using Kafka's group management facility. | |
| | `topic` | String|true | " " (empty string)| The Kafka topic that sends messages to Pulsar. | |
| | `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br /><br />**Note: other properties specified in the connector configuration file take precedence over this configuration**. | |
| | `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br /> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java). |
| | `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values. |
| | `autoOffsetReset` | String | false | earliest | The default offset reset policy. | |
| |
| ### Schema Management |
| |
| This Kafka source connector applies the schema to the topic depending on the data type that is present on the Kafka topic. |
| You can detect the data type from the `keyDeserializationClass` and `valueDeserializationClass` configuration parameters. |
| |
| If the `valueDeserializationClass` is `org.apache.kafka.common.serialization.StringDeserializer`, you can set Schema.STRING() as schema type on the Pulsar topic. |
| |
| If `valueDeserializationClass` is `io.confluent.kafka.serializers.KafkaAvroDeserializer`, Pulsar downloads the AVRO schema from the Confluent Schema Registry® |
| and sets it properly on the Pulsar topic. |
| |
| In this case, you need to set `schema.registry.url` inside of the `consumerConfigProperties` configuration entry |
| of the source. |
| |
| If `keyDeserializationClass` is not `org.apache.kafka.common.serialization.StringDeserializer`, it means |
| that you do not have a string as a key and the Kafka Source uses the KeyValue schema type with the SEPARATED encoding. |
| |
| Pulsar supports the AVRO format for keys. |
| |
| In this case, you can have a Pulsar topic with the following properties: |
| - Schema: KeyValue schema with SEPARATED encoding |
| - Key: the key content of the Kafka message (base64-encoded) |
| - Value: the value content of the Kafka message |
| - KeySchema: the schema detected from `keyDeserializationClass` |
| - ValueSchema: the schema detected from `valueDeserializationClass` |
| |
| Topic compaction and partition routing use the Pulsar key, which contains the Kafka key, and so they are driven by the same value that you have on Kafka. |
| |
| When you consume data from Pulsar topics, you can use the `KeyValue` schema. In this way, you can decode the data properly. |
| If you want to access the raw key, you can use the `Message#getKeyBytes()` API. |
| |
| ### Example |
| |
| Before using the Kafka source connector, you need to create a configuration file through one of the following methods. |
| |
| - JSON |
| |
| ```json |
| { |
| "bootstrapServers": "pulsar-kafka:9092", |
| "groupId": "test-pulsar-io", |
| "topic": "my-topic", |
| "sessionTimeoutMs": "10000", |
| "autoCommitEnabled": false |
| } |
| ``` |
| |
| - YAML |
| |
| ```yaml |
| configs: |
| bootstrapServers: "pulsar-kafka:9092" |
| groupId: "test-pulsar-io" |
| topic: "my-topic" |
| sessionTimeoutMs: "10000" |
| autoCommitEnabled: false |
| ``` |
| |
| ## Usage |
| |
| You can make the Kafka source connector as a Pulsar built-in connector and use it on a standalone cluster or an on-premises cluster. |
| |
| ### Standalone cluster |
| |
| This example describes how to use the Kafka source connector to feed data from Kafka and write data to Pulsar topics in the standalone mode. |
| |
| #### Prerequisites |
| |
| - Install [Docker](https://docs.docker.com/get-docker/)(Community Edition). |
| |
| #### Steps |
| |
| 1. Download and start the Confluent Platform. |
| For details, see the [documentation](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) to install the Kafka service locally. |
| |
| 2. Pull a Pulsar image and start Pulsar in standalone mode. |
| |
| ```bash |
| docker pull apachepulsar/pulsar:latest |
| |
| docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:latest bin/pulsar standalone |
| ``` |
| |
| 3. Create a producer file _kafka-producer.py_. |
| |
| ```python |
| from kafka import KafkaProducer |
| producer = KafkaProducer(bootstrap_servers='localhost:9092') |
| future = producer.send('my-topic', b'hello world') |
| future.get() |
| ``` |
| |
| 4. Create a consumer file _pulsar-client.py_. |
| |
| ```python |
| import pulsar |
| |
| client = pulsar.Client('pulsar://localhost:6650') |
| consumer = client.subscribe('my-topic', subscription_name='my-aa') |
| |
| while True: |
| msg = consumer.receive() |
| print msg |
| print dir(msg) |
| print("Received message: '%s'" % msg.data()) |
| consumer.acknowledge(msg) |
| |
| client.close() |
| ``` |
| |
| 5. Copy the following files to Pulsar. |
| |
| ```bash |
| docker cp pulsar-io-kafka.nar pulsar-kafka-standalone:/pulsar |
| docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf |
| ``` |
| |
| 6. Open a new terminal window and start the Kafka source connector in local run mode. |
| |
| ```bash |
| docker exec -it pulsar-kafka-standalone /bin/bash |
| |
| ./bin/pulsar-admin source localrun \ |
| --archive ./pulsar-io-kafka.nar \ |
| --tenant public \ |
| --namespace default \ |
| --name kafka \ |
| --destination-topic-name my-topic \ |
| --source-config-file ./conf/kafkaSourceConfig.yaml \ |
| --parallelism 1 |
| ``` |
| |
| 7. Open a new terminal window and run the Kafka producer locally. |
| |
| ```bash |
| python3 kafka-producer.py |
| ``` |
| |
| 8. Open a new terminal window and run the Pulsar consumer locally. |
| |
| ```bash |
| python3 pulsar-client.py |
| ``` |
| |
| The following information appears on the consumer terminal window. |
| |
| ```bash |
| Received message: 'hello world' |
| ``` |
| |
| ### On-premises cluster |
| |
| This example explains how to create a Kafka source connector in an on-premises cluster. |
| |
| 1. Copy the NAR package of the Kafka connector to the Pulsar connectors directory. |
| |
| ```bash |
| cp pulsar-io-kafka-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-kafka-{{connector:version}}.nar |
| ``` |
| |
| 2. Reload all [built-in connectors](/io-connectors.md). |
| |
| ```bash |
| PULSAR_HOME/bin/pulsar-admin sources reload |
| ``` |
| |
| 3. Check whether the Kafka source connector is available on the list or not. |
| |
| ```bash |
| PULSAR_HOME/bin/pulsar-admin sources available-sources |
| ``` |
| |
| 4. Create a Kafka source connector on a Pulsar cluster using the [`pulsar-admin sources create`](/tools/pulsar-admin/2.11.0-SNAPSHOT/#-em-create-em--14) command. |
| |
| ```bash |
| PULSAR_HOME/bin/pulsar-admin sources create \ |
| --source-config-file <kafka-source-config.yaml> |
| ``` |
| |