Kinesis sink connector
You can download all the Pulsar connectors on download page.
The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis.
Configuration
The configuration of the Kinesis sink connector has the following property.
Property
Name | Type | Required | Default | Description |
---|---|---|---|---|
messageFormat | MessageFormat | true | ONLY_RAW_PAYLOAD | Message format in which Kinesis sink converts Pulsar messages and publishes to Kinesis streams. Below are the available options: ONLY_RAW_PAYLOAD : Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream. FULL_MESSAGE_IN_JSON : Kinesis sink creates a JSON payload with Pulsar message payload, properties and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.FULL_MESSAGE_IN_FB : Kinesis sink creates a flatbuffer serialized payload with Pulsar message payload, properties and encryptionCtx, and publishes flatbuffer payload into the configured Kinesis stream.FULL_MESSAGE_IN_JSON_EXPAND_VALUE : Kinesis sink sends a JSON structure containing the record topic name, key, payload, properties and event time. The record schema is used to convert the value to JSON. |
jsonIncludeNonNulls | boolean | false | true | Only the properties with non-null values are included when the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE . |
jsonFlatten | boolean | false | false | When it is set to true and the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE , the output JSON is flattened. |
retainOrdering | boolean | false | false | Whether Pulsar connectors to retain ordering when moving messages from Pulsar to Kinesis or not. |
awsEndpoint | String | false | " " (empty string) | The Kinesis end-point URL, which can be found at here. |
awsRegion | String | false | " " (empty string) | The AWS region. Example us-west-1, us-west-2 |
awsKinesisStreamName | String | true | " " (empty string) | The Kinesis stream name. |
awsCredentialPluginName | String | false | " " (empty string) | The fully-qualified class name of implementation of AwsCredentialProviderPlugin. It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink. If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in awsCredentialPluginParam . |
awsCredentialPluginParam | String | false | " " (empty string) | The JSON parameter to initialize awsCredentialsProviderPlugin . |
Built-in plugins
The following are built-in AwsCredentialProviderPlugin
plugins:
org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin
This plugin takes no configuration, it uses the default AWS provider chain.
For more information, see AWS documentation.
org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin
This plugin takes a configuration (via the
awsCredentialPluginParam
) that describes a role to assume when running the KCL.This configuration takes the form of a small JSON document like:
{"roleArn": "arn...", "roleSessionName": "name"}
Example
Before using the Kinesis sink connector, you need to create a configuration file through one of the following methods.
JSON
{
"configs": {
"awsEndpoint": "some.endpoint.aws",
"awsRegion": "us-east-1",
"awsKinesisStreamName": "my-stream",
"awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
"messageFormat": "ONLY_RAW_PAYLOAD",
"retainOrdering": "true"
}
}YAML
configs:
awsEndpoint: "some.endpoint.aws"
awsRegion: "us-east-1"
awsKinesisStreamName: "my-stream"
awsCredentialPluginParam: "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"
messageFormat: "ONLY_RAW_PAYLOAD"
retainOrdering: "true"