This guide describes how to use Pulsar connectors.
Pulsar bundles several builtin connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-builtin connectors.
Note
When using a non-builtin connector, you need to specify the path of a archive file for the connector.
To set up a builtin connector, follow the instructions here.
After the setup, the builtin connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.
You can configure the following information:
To configure a default folder for builtin connectors, set the connectorsDirectory
parameter in the ./conf/functions_worker.yml
configuration file.
Example
Set the ./connectors
folder as the default storage location for builtin connectors.
######################## # Connectors ######################## connectorsDirectory: ./connectors
To configure a connector, you need to provide a YAML configuration file when creating a connector.
The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.
Example 1
Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:
Which Cassandra cluster to connect
What is the keyspace
and columnFamily
to be used in Cassandra for collecting data
How to map Pulsar messages into Cassandra table key and columns
tenant: public namespace: default name: cassandra-test-sink ... # cassandra specific config configs: roots: "localhost:9042" keyspace: "pulsar_test_keyspace" columnFamily: "pulsar_test_table" keyname: "key" columnName: "col"
Example 2
Below is a YAML configuration file of a Kafka source.
configs: bootstrapServers: "pulsar-kafka:9092" groupId: "test-pulsar-io" topic: "my-topic" sessionTimeoutMs: "10000" autoCommitEnabled: "false"
Example 3
Below is a YAML configuration file of a PostgreSQL JDBC sink.
configs: userName: "postgres" password: "password" jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc" tableName: "test_jdbc"
Before starting using connectors, you can perform the following operations:
reload
If you add or delete a nar file in a connector folder, reload the available builtin connector before using it.
Use the reload
subcommand.
$ pulsar-admin sources reload
For more information, see here
.
Use the reload
subcommand.
$ pulsar-admin sinks reload
For more information, see here
.
available
After reloading connectors (optional), you can get a list of available connectors.
Use the available-sources
subcommand.
$ pulsar-admin sources available-sources
Use the available-sinks
subcommand.
$ pulsar-admin sinks available-sinks
To run a connector, you can perform the following operations:
create
You can create a connector using Admin CLI, REST API or JAVA admin API.f
Create a source connector.
Use the create
subcommand.
$ pulsar-admin sources create options
For more information, see here.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sources/:tenant/:namespace/:sourceName|operation/registerSource?version=[[pulsar:version_number]]}
Create a source connector with a local file.
void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException
Parameter
Name | Description |
---|---|
sourceConfig | The source configuration object |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see createSource
.
Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException
Supported URLs are http
and file
.
Example
Parameter
Parameter | Description |
---|---|
sourceConfig | The source configuration object |
pkgUrl | URL from which pkg can be downloaded |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see createSourceWithUrl
.
Create a sink connector.
Use the create
subcommand.
$ pulsar-admin sinks create options
For more information, see here.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sinks/:tenant/:namespace/:sinkName|operation/registerSink?version=[[pulsar:version_number]]}
Create a sink connector with a local file.
void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException
Parameter
Name | Description |
---|---|
sinkConfig | The sink configuration object |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see createSink
.
Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException
Supported URLs are http
and file
.
Example
Parameter
Parameter | Description |
---|---|
sinkConfig | The sink configuration object |
pkgUrl | URL from which pkg can be downloaded |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see createSinkWithUrl
.
start
You can start a connector using Admin CLI or REST API.
Start a source connector.
Use the start
subcommand.
$ pulsar-admin sources start options
For more information, see here.
Start all source connectors.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sources/:tenant/:namespace/:sourceName/start|operation/startSource?version=[[pulsar:version_number]]}
Start a specified source connector.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/start|operation/startSource?version=[[pulsar:version_number]]}
Start a sink connector.
Use the start
subcommand.
$ pulsar-admin sinks start options
For more information, see here.
Start all sink connectors.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sources/:tenant/:namespace/:sinkName/start|operation/startSink?version=[[pulsar:version_number]]}
Start a specified sink connector.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/start|operation/startSink?version=[[pulsar:version_number]]}
localrun
You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.
Run a source connector locally.
Use the localrun
subcommand.
$ pulsar-admin sources localrun options
For more information, see here.
Run a sink connector locally.
Use the localrun
subcommand.
$ pulsar-admin sinks localrun options
For more information, see here.
To monitor a connector, you can perform the following operations:
get
You can get the information of a connector using Admin CLI, REST API or JAVA admin API.
Get the information of a source connector.
Use the get
subcommand.
$ pulsar-admin sources get options
For more information, see here.
Send a GET
request to this endpoint: {@inject: endpoint|GET|/admin/v3/sources/:tenant/:namespace/:sourceName|operation/getSourceInfo?version=[[pulsar:version_number]]}
SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException
Example
This is a sourceConfig.
{ "tenant": "tenantName", "namespace": "namespaceName", "name": "sourceName", "className": "className", "topicName": "topicName", "configs": {}, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 } }
This is a sourceConfig example.
{ "tenant": "public", "namespace": "default", "name": "debezium-mysql-source", "className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource", "topicName": "debezium-mysql-topic", "configs": { "database.user": "debezium", "database.server.id": "184054", "database.server.name": "dbserver1", "database.port": "3306", "database.hostname": "localhost", "database.password": "dbz", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "database.whitelist": "inventory", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "pulsar.service.url": "pulsar://127.0.0.1:6650", "database.history.pulsar.topic": "history-topic2" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 } }
Exception
Exception name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException | Unexpected error |
For more information, see getSource
.
Get the information of a sink connector.
Use the get
subcommand.
$ pulsar-admin sinks get options
For more information, see here.
Send a GET
request to this endpoint: {@inject: endpoint|GET|/admin/v3/sinks/:tenant/:namespace/:sinkName|operation/getSinkInfo?version=[[pulsar:version_number]]}
SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException
Example
This is a sinkConfig.
{ "tenant": "tenantName", "namespace": "namespaceName", "name": "sinkName", "className": "className", "inputSpecs": { "topicName": { "isRegexPattern": false } }, "configs": {}, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "retainOrdering": false, "autoAck": true }
This is a sinkConfig example.
{ "tenant": "public", "namespace": "default", "name": "pulsar-postgres-jdbc-sink", "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink", "inputSpecs": { "pulsar-postgres-jdbc-sink-topic": { "isRegexPattern": false } }, "configs": { "password": "password", "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink", "userName": "postgres", "tableName": "pulsar_postgres_jdbc_sink" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "retainOrdering": false, "autoAck": true }
Parameter description
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
sink | Sink name |
For more information, see getSink
.
list
You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.
Get the list of all running source connectors.
Use the list
subcommand.
$ pulsar-admin sources list options
For more information, see here.
Send a GET
request to this endpoint: {@inject: endpoint|GET|/admin/v3/sources/:tenant/:namespace/|operation/listSources?version=[[pulsar:version_number]]}
List<String> listSources(String tenant, String namespace) throws PulsarAdminException
Response example
["f1", "f2", "f3"]
Exception
Exception name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException | Unexpected error |
For more information, see listSource
.
Get the list of all running sink connectors.
Use the list
subcommand.
$ pulsar-admin sinks list options
For more information, see here.
Send a GET
request to this endpoint: {@inject: endpoint|GET|/admin/v3/sinks/:tenant/:namespace/|operation/listSinks?version=[[pulsar:version_number]]}
List<String> listSinks(String tenant, String namespace) throws PulsarAdminException
Response example
["f1", "f2", "f3"]
Exception
Exception name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException | Unexpected error |
For more information, see listSource
.
status
You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.
Get the current status of a source connector.
Use the status
subcommand.
$ pulsar-admin sources status options
For more information, see here.
Get the current status of all source connectors.
Send a GET
request to this endpoint: {@inject: endpoint|GET|/admin/v3/sources/:tenant/:namespace/:sourceName/status|operation/getSourceStatus?version=[[pulsar:version_number]]}
Gets the current status of a specified source connector.
Send a GET
request to this endpoint: {@inject: endpoint|GET|/admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/status|operation/getSourceStatus?version=[[pulsar:version_number]]}
Get the current status of all source connectors.
SourceStatus getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException
Parameter
Parameter | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
sink | Source name |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see getSourceStatus
.
Gets the current status of a specified source connector.
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant, String namespace, String source, int id) throws PulsarAdminException
Parameter
Parameter | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
sink | Source name |
id | Source instanceID |
Exception
Exception name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see getSourceStatus
.
Get the current status of a Pulsar sink connector.
Use the status
subcommand.
$ pulsar-admin sinks status options
For more information, see here.
Get the current status of all sink connectors.
Send a GET
request to this endpoint: {@inject: endpoint|GET|/admin/v3/sinks/:tenant/:namespace/:sinkName/status|operation/getSinkStatus?version=[[pulsar:version_number]]}
Gets the current status of a specified sink connector.
Send a GET
request to this endpoint: {@inject: endpoint|GET|/admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/status|operation/getSinkInstanceStatus?version=[[pulsar:version_number]]}
Get the current status of all sink connectors.
SinkStatus getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException
Parameter
Parameter | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
sink | Source name |
Exception
Exception name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see getSinkStatus
.
Gets the current status of a specified source connector.
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant, String namespace, String sink, int id) throws PulsarAdminException
Parameter
Parameter | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
sink | Source name |
id | Sink instanceID |
Exception
Exception name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see getSinkStatusWithInstanceID
.
update
You can update a running connector using Admin CLI, REST API or JAVA admin API.
Update a running Pulsar source connector.
Use the update
subcommand.
$ pulsar-admin sources update options
For more information, see here.
Send a PUT
request to this endpoint: {@inject: endpoint|PUT|/admin/v3/sources/:tenant/:namespace/:sourceName|operation/updateSource?version=[[pulsar:version_number]]}
Update a running source connector with a local file.
void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException
Parameter
Name | Description |
---|---|
sourceConfig | The source configuration object |
Exception
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException | Unexpected error |
For more information, see updateSource
.
Update a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException
Supported URLs are http
and file
.
Example
Parameter
Name | Description |
---|---|
sourceConfig | The source configuration object |
pkgUrl | URL from which pkg can be downloaded |
Exception
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException | Unexpected error |
For more information, see createSourceWithUrl
.
Update a running Pulsar sink connector.
Use the update
subcommand.
$ pulsar-admin sinks update options
For more information, see here.
Send a PUT
request to this endpoint: {@inject: endpoint|PUT|/admin/v3/sinks/:tenant/:namespace/:sinkName|operation/updateSink?version=[[pulsar:version_number]]}
Update a running sink connector with a local file.
void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException
Parameter
Name | Description |
---|---|
sinkConfig | The sink configuration object |
Exception
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException | Unexpected error |
For more information, see updateSink
.
Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException
Supported URLs are http
and file
.
Example
Parameter
Name | Description |
---|---|
sinkConfig | The sink configuration object |
pkgUrl | URL from which pkg can be downloaded |
Exception
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException | Unexpected error |
For more information, see updateSinkWithUrl
.
stop
You can stop a connector using Admin CLI, REST API or JAVA admin API.
Stop a source connector.
Use the stop
subcommand.
$ pulsar-admin sources stop options
For more information, see here.
Stop all source connectors.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sources/:tenant/:namespace/:sourceName|operation/stopSource?version=[[pulsar:version_number]]}
Stop a specified source connector.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId|operation/stopSource?version=[[pulsar:version_number]]}
Stop all source connectors.
void stopSource(String tenant, String namespace, String source) throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
source | Source name |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see stopSource
.
Stop a specified source connector.
void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
source | Source name |
instanceId | Source instanceID |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see stopSource
.
Stop a sink connector.
Use the stop
subcommand.
$ pulsar-admin sinks stop options
For more information, see here.
Stop all sink connectors.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sinks/:tenant/:namespace/:sinkName/stop|operation/stopSink?version=[[pulsar:version_number]]}
Stop a specified sink connector.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sources/:tenant/:namespace/:sinkeName/:instanceId/stop|operation/stopSink?version=[[pulsar:version_number]]}
Stop all sink connectors.
void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
source | Source name |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see stopSink
.
Stop a specified sink connector.
void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
source | Source name |
instanceId | Source instanceID |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see stopSink
.
restart
You can restart a connector using Admin CLI, REST API or JAVA admin API.
Restart a source connector.
Use the restart
subcommand.
$ pulsar-admin sources restart options
For more information, see here.
Restart all source connectors.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sources/:tenant/:namespace/:sourceName/restart|operation/restartSource?version=[[pulsar:version_number]]}
Restart a specified source connector.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/restart|operation/restartSource?version=[[pulsar:version_number]]}
Restart all source connectors.
void restartSource(String tenant, String namespace, String source) throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
source | Source name |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see restartSource
.
Restart a specified source connector.
void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
source | Source name |
instanceId | Source instanceID |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see restartSource
.
Restart a sink connector.
Use the restart
subcommand.
$ pulsar-admin sinks restart options
For more information, see here.
Restart all sink connectors.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sources/:tenant/:namespace/:sinkName/restart|operation/restartSource?version=[[pulsar:version_number]]}
Restart a specified sink connector.
Send a POST
request to this endpoint: {@inject: endpoint|POST|/admin/v3/sources/:tenant/:namespace/:sinkName/:instanceId/restart|operation/restartSource?version=[[pulsar:version_number]]}
Restart all Pulsar sink connectors.
void restartSink(String tenant, String namespace, String sink) throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
sink | Sink name |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see restartSink
.
Restart a specified sink connector.
void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
source | Source name |
instanceId | Sink instanceID |
Exception
Name | Description |
---|---|
PulsarAdminException | Unexpected error |
For more information, see restartSink
.
delete
You can delete a connector using Admin CLI, REST API or JAVA admin API.
Delete a source connector.
Use the delete
subcommand.
$ pulsar-admin sources delete options
For more information, see here.
Delete al Pulsar source connector.
Send a DELETE
request to this endpoint: {@inject: endpoint|DELETE|/admin/v3/sources/:tenant/:namespace/:sourceName|operation/deregisterSource?version=[[pulsar:version_number]]}
Delete a source connector.
void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
source | Source name |
Exception
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException.PreconditionFailedException | Cluster is not empty |
PulsarAdminException | Unexpected error |
For more information, see deleteSource
.
Delete a sink connector.
Use the delete
subcommand.
$ pulsar-admin sinks delete options
For more information, see here.
Delete a sink connector.
Send a DELETE
request to this endpoint: {@inject: endpoint|DELETE|/admin/v3/sinks/:tenant/:namespace/:sinkName|operation/deregisterSink?version=[[pulsar:version_number]]}
Delete a Pulsar sink connector.
void deleteSink(String tenant, String namespace, String source) throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
sink | Sink name |
Exception
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException.PreconditionFailedException | Cluster is not empty |
PulsarAdminException | Unexpected error |
For more information, see deleteSource
.