RabbitMQ sink connector
note
You can download all the Pulsar connectors on download page.
The RabbitMQ sink connector pulls messages from Pulsar topics and persists the messages to RabbitMQ queues.
Configuration
The configuration of the RabbitMQ sink connector has the following properties.
Property
Name | Type | Required | Default | Description |
---|---|---|---|---|
connectionName | String | true | (empty string) | The connection name. |
host | String | true | (empty string) | The RabbitMQ host. |
port | int | true | 5672 | The RabbitMQ port. |
virtualHost | String | true | / | The virtual host used to connect to RabbitMQ. |
username | String | false | guest | The username used to authenticate to RabbitMQ. |
password | String | false | guest | The password used to authenticate to RabbitMQ. |
queueName | String | true | (empty string) | The RabbitMQ queue name that messages should be read from or written to. |
requestedChannelMax | int | false | 0 | The initially requested maximum channel number. 0 means unlimited. |
requestedFrameMax | int | false | 0 | The initially requested maximum frame size in octets. 0 means unlimited. |
connectionTimeout | int | false | 60000 | The timeout of TCP connection establishment in milliseconds. 0 means infinite. |
handshakeTimeout | int | false | 10000 | The timeout of AMQP0-9-1 protocol handshake in milliseconds. |
requestedHeartbeat | int | false | 60 | The requested heartbeat timeout in seconds. |
exchangeName | String | true | (empty string) | The exchange to publish messages. |
routingKey | String | true | (empty string) | The routing key used to publish messages. |
Example
Before using the RabbitMQ sink connector, you need to create a configuration file through one of the following methods.
JSON
{
"configs": {
"host": "localhost",
"port": "5672",
"virtualHost": "/",
"username": "guest",
"password": "guest",
"queueName": "test-queue",
"connectionName": "test-connection",
"requestedChannelMax": "0",
"requestedFrameMax": "0",
"connectionTimeout": "60000",
"handshakeTimeout": "10000",
"requestedHeartbeat": "60",
"exchangeName": "test-exchange",
"routingKey": "test-key"
}
}YAML
configs:
host: "localhost"
port: 5672
virtualHost: "/"
username: "guest"
password: "guest"
queueName: "test-queue"
connectionName: "test-connection"
requestedChannelMax: 0
requestedFrameMax: 0
connectionTimeout: 60000
handshakeTimeout: 10000
requestedHeartbeat: 60
exchangeName: "test-exchange"
routingKey: "test-key"