Kafka Sink

You can set up a Kafka Sink to export your data from Tinybird to any Kafka topic. The Kafka Sink allows you to push events to Kafka on a batch-based schedule using Tinybird's fully managed connector.

Setting up the Kafka Sink requires:

  1. Creating a connection file in Tinybird with your Kafka configuration.
  2. Creating a Sink pipe that uses this connection.

Tinybird represents Sinks using the icon.

Environment considerations

Before setting up the Kafka Sink, understand how it works in different environments.

Cloud environment

In the Tinybird Cloud environment, Tinybird connects directly to your Kafka cluster using the connection credentials you provide.

Local environment

When using the Kafka Sink in the Tinybird Local environment, the connection is made from within the container to your Kafka cluster. Ensure your Kafka cluster is accessible from the container network.

When using the Kafka Sink in the --local environment, scheduled sink operations are not supported. You can only run on-demand sinks using tb sink run <pipe_name>. For scheduled sink operations, use the Cloud environment.

Set up the sink

1

Create a Kafka connection

You can create a Kafka connection in Tinybird using either the guided CLI process or by manually creating a connection file.

The Tinybird CLI provides a guided process that helps you set up the Kafka connection:

tb connection create kafka

When prompted, you'll need to:

  1. Enter a name for your connection.
  2. Provide the Kafka bootstrap servers (comma-separated list).
  3. Configure authentication settings (SASL/SSL if required).
  4. Optionally configure additional Kafka client properties.

Option 2: Manually create a connection file

Create a .connection file with the required credentials stored in secrets. For example:

kafka_sample.connection
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS bootsrap_servers:port
KAFKA_SECURITY_PROTOCOL SASL_SSL
KAFKA_SASL_MECHANISM PLAIN
KAFKA_KEY {{ tb_secret("KAFKA_KEY", "key") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET", "secret") }}

See Connection files for more details on how to create a connection file and manage secrets.

You need to create separate connections for each environment you're working with, Local and Cloud.

For example, you can create:

  • kafka-local for your Local environment
  • kafka-cloud for your Cloud environment
2

Create a Sink pipe

To create a Sink pipe, create a regular .pipe and filter the data you want to export to your Kafka topic in the SQL section as in any other pipe. Then, specify the pipe as a sink type and add the needed configuration. Your pipe should have the following structure:

kafka_export.pipe
NODE node_0

SQL >
    SELECT 
        customer_id,
        event_type,
        status,
        amount
    FROM events
    WHERE status = 'completed'

TYPE sink
EXPORT_CONNECTION_NAME "kafka_connection"
EXPORT_KAFKA_TOPIC "events_topic"
EXPORT_SCHEDULE "*/5 * * * *" 
3

Deploy the Sink pipe

After defining your Sink pipe and connection, test it by running a deploy check:

tb --cloud deploy --check

This runs the connection locally and checks if the connection is valid. To see the connection details, run tb --cloud connection ls.

When ready, push the datafile to your Workspace using tb deploy to create the Sink pipe:

tb --cloud deploy

This creates the Sink pipe in your workspace and makes it available for execution.

.connection settings

The Kafka connector uses the following settings in .connection files:

InstructionRequiredDescription
KAFKA_BOOTSTRAP_SERVERSYesComma-separated list of one or more Kafka brokers, including Port numbers.
KAFKA_KEYYesKey used to authenticate with Kafka. Sometimes called Key, Client Key, or Username depending on the Kafka distribution.
KAFKA_SECRETYesSecret used to authenticate with Kafka. Sometimes called Secret, Secret Key, or Password depending on the Kafka distribution.
KAFKA_SECURITY_PROTOCOLNoSecurity protocol for the connection. Accepted values are PLAINTEXT and SASL_SSL. Default value is SASL_SSL.
KAFKA_SASL_MECHANISMNoSASL mechanism to use for authentication. Supported values are PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. Default value is PLAIN.
KAFKA_SCHEMA_REGISTRY_URLNoURL of the Kafka schema registry. Used for avro and json_with_schema deserialization of keys and values. If Basic Auth is required, it must be included in the URL as in https://user:password@registry_url
KAFKA_SSL_CA_PEMNoContent of the CA certificate in PEM format for SSL connections.

.pipe settings

The Kafka Sink pipe uses the following settings in .pipe files:

KeyTypeDescription
EXPORT_CONNECTION_NAMEstringRequired. The connection name to the destination service. This is the connection created in Step 1.
EXPORT_KAFKA_TOPICstringRequired. The Kafka topic where events will be published.
EXPORT_SCHEDULEstringRequired. A crontab expression that sets the frequency of the Sink operation or the @on-demand string.

Scheduling considerations

The schedule applied to a Sink pipe doesn't guarantee that the underlying job executes immediately at the configured time. The job is placed into a job queue when the configured time elapses. It is possible that, if the queue is busy, the job could be delayed and executed after the scheduled time.

To reduce the chances of a busy queue affecting your Sink pipe execution schedule, distribute the jobs over a wider period of time rather than grouping them close together.

Query parameters

You can add query parameters to your Sink pipes, the same way you do in API Endpoints or Copy pipes.

  • For on-demand executions, you can set parameters when you trigger the Sink pipe to whatever values you wish.
  • For scheduled executions, the default values for the parameters will be used when the Sink pipe runs.

Execute the Sink pipe

On-demand execution

You can trigger your Sink pipe manually using:

tb sink run <pipe_name>

When triggering a Sink pipe you have the option of overriding several of its settings, like topic or format. Refer to the Sink pipes API spec for the full list of parameters.

Scheduled execution

If you configured a schedule with EXPORT_SCHEDULE, the Sink pipe will run automatically according to the cron expression.

Once the Sink pipe is triggered, it creates a standard Tinybird job that can be followed via the v0/jobs API or using tb job ls --kind=sink.

Observability

Sink pipes operations are logged in the tinybird.jobs_log Service Data Source. You can filter by job_type = 'sink' to see only Sink pipe executions.

For more detailed Sink-specific information, you can also use tinybird.sinks_ops_log.

Limits & quotas

Check the limits page for limits on ingestion, queries, API Endpoints, and more.

Billing

When a Sink pipe executes, it uses your plan's included compute resources (vCPUs and active minutes) to run the query, then publishes the result to Kafka.

Enterprise customers

Tinybird includes Data Transfer allowances for Enterprise customers. Contact your Customer Success team or email us at support@tinybird.co to discuss your specific requirements.

Next steps

Updated