blob: 068daee7cd17c8c20a5ccd82aaeecf52a095075f [file] [log] [blame] [view]
import ChangeLog from '../changelog/connector-rabbitmq.md';
# Rabbitmq
> Rabbitmq source connector
## Description
Used to read data from Rabbitmq.
## Key features
- [ ] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
:::tip
The source must be non-parallel (parallelism set to 1) in order to achieve exactly-once. This limitation is mainly due to RabbitMQs approach to dispatching messages from a single queue to multiple consumers.
:::
## Options
| name | type | required | default value |
| -------------------------- | ------- | -------- | ------------- |
| host | string | yes | - |
| port | int | yes | - |
| virtual_host | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| queue_name | string | yes | - |
| schema | config | yes | - |
| url | string | no | - |
| routing_key | string | no | - |
| exchange | string | no | - |
| network_recovery_interval | int | no | - |
| topology_recovery_enabled | boolean | no | - |
| automatic_recovery_enabled | boolean | no | - |
| connection_timeout | int | no | - |
| requested_channel_max | int | no | - |
| requested_frame_max | int | no | - |
| requested_heartbeat | int | no | - |
| prefetch_count | int | no | - |
| delivery_timeout | long | no | - |
| common-options | | no | - |
| durable | boolean | no | true |
| exclusive | boolean | no | false |
| auto_delete | boolean | no | false |
### host [string]
the default host to use for connections
### port [int]
the default port to use for connections
### virtual_host [string]
virtual host the virtual host to use when connecting to the broker
### username [string]
the AMQP user name to use when connecting to the broker
### password [string]
the password to use when connecting to the broker
### url [string]
convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host
### queue_name [string]
the queue to publish the message to
### routing_key [string]
the routing key to publish the message to
### exchange [string]
the exchange to publish the message to
### schema [Config]
#### fields [Config]
the schema fields of upstream data.
### network_recovery_interval [int]
how long will automatic recovery wait before attempting to reconnect, in ms
### topology_recovery [string]
if true, enables topology recovery
### automatic_recovery [string]
if true, enables connection recovery
### connection_timeout [int]
connection tcp establishment timeout in milliseconds; zero for infinite
### requested_channel_max [int]
initially requested maximum channel number; zero for unlimited
**Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
### requested_frame_max [int]
the requested maximum frame size
### requested_heartbeat [int]
Set the requested heartbeat timeout
**Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
### prefetch_count [int]
prefetchCount the max number of messages to receive without acknowledgement
### delivery_timeout [long]
deliveryTimeout maximum wait time, in milliseconds, for the next message delivery
### common options
Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details
### durable
- true: The queue will survive on server restart.
- false: The queue will be deleted on server restart.
### exclusive
- true: The queue is used only by the current connection and will be deleted when the connection closes.
- false: The queue can be used by multiple connections.
### auto-delete
- true: The queue will be deleted automatically when the last consumer unsubscribes.
- false: The queue will not be automatically deleted.
## Example
simple:
```hocon
source {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
}
}
}
}
```
## Changelog
<ChangeLog />