Create an S3 Sink Connector
The Amazon S3 Sink connector exports Apache Kafka messages to files in AWS S3 buckets.
Prerequisites
Before you can create an AWS S3 sink connector in the Redpanda Cloud, you must complete these tasks:
-
Create an S3 bucket that you will send data to.
-
Create an IAM user that will be used to connect to the S3 service.
-
Attach the following policy to the user, replacing
bucket-namewith the name you specified in step 2.{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts", "s3:ListBucketMultipartUploads" ], "Resource": "arn:aws:s3:::bucket-name/*" } ] } -
Create access keys for the user created in step 3.
-
Copy the access key ID and the secret access key. You will need them to configure the connector.
Limitations
-
You can use only the
STRINGandBYTESinput formats forCSVoutput format. -
You can use only the
PARQUETformat when your messages contain schema.
Create an AWS S3 Sink connector
To create the AWS S3 Sink connector:
-
In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.
-
Select Export to S3.
-
On the Create Connector page, specify the following required connector configuration options:
Property Description Topics to exportComma-separated list of the cluster topics whose records will be exported to the S3 bucket.
AWS access key IDEnter the AWS access key ID.
AWS secret access keyEnter the AWS secret access key.
AWS S3 bucket nameSpecify the name of the AWS S3 bucket to which the connector is to send data.
AWS S3 regionSelect the region for the S3 bucket used for storing the records. The default
us-east-1.Kafka message key formatFormat of the key in the Kafka topic. The default is
BYTES.Kafka message value formatFormat of the value in the Kafka topic. The default is
BYTES.S3 file formatFormat of the files created in S3:
CSV(the default),AVRO,JSON,JSONL, orPARQUET. You can use theCSVformat output only withBYTESandSTRING.Max TasksMaximum number of tasks to use for this connector. The default is
1. Each task replicates exclusive set of partitions assigned to it.Connector nameGlobally-unique name to use for this connector.
-
Click Next. Review the connector properties specified, then click Create.
Advanced AWS S3 Sink connector configuration
In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:
| Property | Description |
|---|---|
|
The template for file names on S3. Supports
|
|
The prefix to be added to the name of each file put in S3. |
|
Fields to place into output files. Supported values are: 'key', 'value', 'offset', 'timestamp', and 'headers'. |
|
The type of encoding to be used for the value field. Supported values are: 'none' and 'base64'. |
|
The compression type to be used for files put into S3. Supported values are: 'none' (default), 'gzip', 'snappy', and 'zstd'. |
|
The maximum number of records to put in a single file. Must be a non-negative number. 0 is interpreted as "unlimited", which is the default. In this case files are only flushed after |
|
The time interval to periodically flush files and commit offsets. Value specified must be a non-negative number. Default is 60 seconds. 0 indicates that it is disabled. In this case, files are only flushed after reaching |
|
If set to |
|
The part size in S3 multi-part uploads in bytes. Maximum is 2147483647 (2GB) and default is 5242880 (5MB). |
|
S3 default base sleep time (in milliseconds) for non-throttled exceptions. Default is 100. |
|
S3 maximum back-off time (in milliseconds) before retrying a request. Default is 20000. |
|
Maximum retry limit (if the value is greater than 30, there can be integer overflow issues during delay calculation). Default is 3. |
|
Error tolerance response during connector operation. Default value is |
|
The name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, its transformations, or converters. The topic name is blank by default, which means that no messages are recorded in the DLQ. |
|
Replication factor used to create the dead letter queue topic when it doesn’t already exist. |
|
When |
Map data
Use the appropriate key or value converter (input data format) for your data as follows:
-
JSONwhen your messages are JSON-encoded. SelectMessage JSON contains schema, with theschemaandpayloadfields. -
AVROwhen your messages contain AVRO-encoded messages, with schema stored in the Schema Registry. -
STRINGwhen your messages contain textual data. -
BYTESwhen your messages contain arbitrary data.
You can also select the output data format for your S3 files as follows:
-
CSVto produce data in theCSVformat. ForCSVonly, you can setSTRINGandBYTESinput formats. -
JSONto produce data in theJSONformat as an array of record objects. -
JSONLto produce data in theJSONformat, each message as a separate JSON, one per line. -
PARQUETto produce data in thePARQUETformat when your messages contain schema. -
AVROto produce data in theAVROformat when your messages contain schema.
Test the connection
After the connector is created, test the connection by writing to one of your topics, then checking the contents of the S3 bucket in the AWS management console. Files should appear after the file flush interval (default is 60 seconds).
Troubleshoot
If there are any connection issues, an error message is returned. Depending on
the AWS S3 bucket check property value, the error results in a failed connector
(AWS S3 bucket check = true) or a failed task (AWS S3 bucket check = false).
Select Show Logs to view error details.
Additional errors and corrective actions follow.
| Message | Action |
|---|---|
The AWS Access Key Id you provided does not exist in our records |
|
The authorization header is malformed; the region us-east-1 is wrong; expecting us-east-2 |
The selected region ( |
The specified bucket does not exist |
Create the bucket specified in the |
No files in the S3 bucket |
Be sure to wait until the connector completes the first file flush (default 60 seconds). Verify that the topics specified are correct. Then verify that the topics contain messages to be pushed to S3. |