---
title: Kafka Sink
meta:
    description: Push events to Kafka on a batch-based schedule using Tinybird's fully managed Kafka Sink Connector.
---

# {% icon name="sink" /%} Kafka Sink

You can set up a Kafka Sink to export your data from Tinybird to any Kafka topic. The Kafka Sink allows you to push events to Kafka on a batch-based schedule using Tinybird's fully managed connector.

Setting up the Kafka Sink requires:

1. Creating a connection file in Tinybird with your Kafka configuration.
2. Creating a Sink pipe that uses this connection.

Tinybird represents Sinks using the {% icon name="sink" /%} icon.

{% callout type="info" %}
Kafka Sinks are available on the Developer and Enterprise plans. See [Plans](/forward/pricing).
{% /callout %}

## Environment considerations

Before setting up the Kafka Sink, understand how it works in different environments.

### Cloud environment

In the Tinybird Cloud environment, Tinybird connects directly to your Kafka cluster using the connection credentials you provide.

### Local environment

When using the Kafka Sink in the Tinybird Local environment, the connection is made from within the container to your Kafka cluster. Ensure your Kafka cluster is accessible from the container network.

{% callout type="caution" %}
When using the Kafka Sink in the `--local` environment, scheduled sink operations are not supported. You can only run on-demand sinks using `tb sink run <pipe_name>`. For scheduled sink operations, use the Cloud environment.
{% /callout %}

## Set up the sink

{% steps %}

### Create a Kafka connection

You can create a Kafka connection in Tinybird using either the guided CLI process or by manually creating a connection file.

#### Option 1: Use the guided CLI process (recommended)

The Tinybird CLI provides a guided process that helps you set up the Kafka connection:

```bash
tb connection create kafka
```

When prompted, provide:

1. Enter a name for your connection.
2. Provide the Kafka bootstrap servers (comma-separated list).
3. Configure authentication settings (SASL/SSL if required).
4. Optionally configure additional Kafka client properties.

#### Option 2: Manually create a connection file

Create a [.connection file](/forward/dev-reference/datafiles/connection-files) with the required credentials stored in secrets. For example:

```tb {% title="kafka_sample.connection" %}
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS bootsrap_servers:port
KAFKA_SECURITY_PROTOCOL SASL_SSL
KAFKA_SASL_MECHANISM PLAIN
KAFKA_KEY {{ tb_secret("KAFKA_KEY", "key") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET", "secret") }}
```

See [Connection files](/forward/dev-reference/datafiles/connection-files) for more details on how to create a connection file and manage secrets.

{% callout type="caution" %}
You need to create separate connections for each environment you're working with, Local and Cloud.

For example, you can create:

- `kafka-local` for your Local environment
- `kafka-cloud` for your Cloud environment
{% /callout %}

### Create a Sink pipe

To create a Sink pipe, create a regular .pipe and filter the data you want to export to your Kafka topic in the SQL section as in any other pipe. Then, specify the pipe as a sink type and add the needed configuration. Your pipe should have the following structure:

```tb {% title="kafka_export.pipe" %}
NODE node_0

SQL >
    SELECT 
        customer_id,
        event_type,
        status,
        amount
    FROM events
    WHERE status = 'completed'

TYPE sink
EXPORT_CONNECTION_NAME "kafka_connection"
EXPORT_KAFKA_TOPIC "events_topic"
EXPORT_SCHEDULE "*/5 * * * *" 
```

### Deploy the Sink pipe

After defining your Sink pipe and connection, test the Kafka connection:

```bash
tb connection data <connection_name>
```

This command tests the Kafka connection configuration by attempting to consume from a topic (it's designed for the Kafka source connector). While it validates connection details, for Kafka Sinks, the connection is used to produce data to Kafka, not consume from it. To see the connection details, run `tb --cloud connection ls`.

When ready, push the datafile to your Workspace using `tb deploy` to create the Sink pipe:

```bash
tb --cloud deploy
```

This creates the Sink pipe in your workspace and makes it available for execution.

{% /steps %}

## .connection settings

The Kafka connector uses the following settings in .connection files:

{% table %}
   * Instruction
   * Required
   * Description
   ---
   * `KAFKA_BOOTSTRAP_SERVERS`
   * Yes
   * Comma-separated list of one or more Kafka brokers, including Port numbers.
   ---
   * `KAFKA_KEY`
   * Yes
   * Key used to authenticate with Kafka. Sometimes called Key, Client Key, or Username depending on the Kafka distribution.
   ---
   * `KAFKA_SECRET`
   * Yes
   * Secret used to authenticate with Kafka. Sometimes called Secret, Secret Key, or Password depending on the Kafka distribution.
   ---
   * `KAFKA_SECURITY_PROTOCOL`
   * No
   * Security protocol for the connection. Accepted values are `PLAINTEXT` and `SASL_SSL`. Default value is `SASL_SSL`.
   ---
   * `KAFKA_SASL_MECHANISM`
   * No
   * SASL mechanism to use for authentication. Supported values are `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`. Default value is `PLAIN`.
   ---
   * `KAFKA_SCHEMA_REGISTRY_URL`
   * No
   * URL of the Kafka schema registry. Used for `avro` and `json_with_schema` deserialization of
     keys and values. If Basic Auth is required, include credentials in the URL format:
     `https://<username>:<password>@registry.example.com`
   ---
   * `KAFKA_SSL_CA_PEM`
   * No
   * Content of the CA certificate in PEM format for SSL connections.
{% /table %}

## .pipe settings

The Kafka Sink pipe uses the following settings in .pipe files:

{% table %}
  * Key
  * Type
  * Description
  ---
  * `EXPORT_CONNECTION_NAME`
  * string
  * Required. The connection name to the destination service. This is the connection created in Step 1.
  ---
  * `EXPORT_KAFKA_TOPIC`
  * string
  * Required. The Kafka topic where events are published.
  ---
  * `EXPORT_SCHEDULE`
  * string
  * Required. A crontab expression that sets the frequency of the Sink operation or the @on-demand string.
{% /table %}

### Scheduling considerations

The schedule applied to a Sink pipe doesn't guarantee that the underlying job executes immediately at the configured time. The job is placed into a job queue when the configured time elapses. It is possible that, if the queue is busy, the job could be delayed and executed after the scheduled time.

To reduce the chances of a busy queue affecting your Sink pipe execution schedule, distribute the jobs over a wider period of time rather than grouping them close together.

### Query parameters

You can add [query parameters](/forward/work-with-data/query-parameters) to your Sink pipes, the same way you do in API Endpoints or Copy pipes.

- For on-demand executions, you can set parameters when you trigger the Sink pipe to whatever values you wish.
- For scheduled executions, the default values for the parameters are used when the Sink pipe runs.

## Execute the Sink pipe

### On-demand execution

You can trigger your Sink pipe manually using:

```bash
tb sink run <pipe_name>
```

{% callout type="tip" %}
When triggering a Sink pipe you have the option of overriding several of its settings, like topic or format. Refer to the [Sink pipes API spec](/api-reference/sink-pipes-api) for the full list of parameters.
{% /callout %}

### Scheduled execution

If you configured a schedule with `EXPORT_SCHEDULE`, the Sink pipe runs automatically according to the cron expression.

Once the Sink pipe is triggered, it creates a standard Tinybird job that can be followed via the `v0/jobs` API or using `tb job ls --kind=sink`.

## Observability

Sink pipes operations are logged in the [tinybird.jobs_log](/forward/monitoring/service-datasources#tinybird-jobs-log) Service Data Source. You can filter by `job_type = 'sink'` to see only Sink pipe executions.

For more detailed Sink-specific information, you can also use [tinybird.sinks_ops_log](/forward/monitoring/service-datasources#tinybird-sinks-ops-log).

## Limits & quotas

{% snippet title="forward-limits-reminder" /%}

## Billing

When a Sink pipe executes, it uses your plan's included compute resources (vCPUs and active minutes) to run the query, then publishes the result to Kafka.

### Enterprise customers

Tinybird includes Data Transfer allowances for Enterprise customers. Contact your Customer Success team or email us at <support@tinybird.co> to discuss your specific requirements.

## Performance

### Throughput considerations

Kafka Sink performance depends on:
- Query result size
- Network latency to Kafka cluster
- Kafka cluster capacity
- Message serialization overhead

**Monitor Sink performance:**

```sql
SELECT
    pipe_name,
    job_type,
    status,
    elapsed_time,
    rows_written,
    bytes_written
FROM tinybird.jobs_log
WHERE job_type = 'sink'
  AND timestamp > now() - INTERVAL 24 hour
ORDER BY timestamp DESC
```

### Optimization tips

1. **Batch size**: Larger batches improve throughput but increase latency
2. **Schedule frequency**: Balance between freshness and resource usage
3. **Query optimization**: Optimize your Sink pipe queries for performance
4. **Message format**: Use efficient serialization (JSON is typically fastest)

## Use cases and examples

### Real-time event streaming

Stream processed events to downstream systems:

```tb {% title="event_stream.pipe" %}
NODE node_0

SQL >
    SELECT
        user_id,
        event_type,
        timestamp,
        metadata
    FROM processed_events
    WHERE timestamp > now() - INTERVAL 1 hour
      AND status = 'processed'

TYPE sink
EXPORT_CONNECTION_NAME "kafka_connection"
EXPORT_KAFKA_TOPIC "downstream_events"
EXPORT_SCHEDULE "*/5 * * * *"
```

### Data synchronization

Sync data changes to external systems:

```tb {% title="user_sync.pipe" %}
NODE node_0

SQL >
    SELECT
        user_id,
        email,
        updated_at,
        status
    FROM users
    WHERE updated_at > now() - INTERVAL 1 hour

TYPE sink
EXPORT_CONNECTION_NAME "kafka_connection"
EXPORT_KAFKA_TOPIC "user_updates"
EXPORT_SCHEDULE "0 * * * *"
```

### Analytics data export

Export aggregated analytics to Kafka for further processing:

```tb {% title="analytics_export.pipe" %}
NODE node_0

SQL >
    SELECT
        toStartOfHour(timestamp) as hour,
        event_type,
        count() as event_count,
        uniq(user_id) as unique_users
    FROM events
    WHERE timestamp > now() - INTERVAL 1 hour
    GROUP BY hour, event_type

TYPE sink
EXPORT_CONNECTION_NAME "kafka_connection"
EXPORT_KAFKA_TOPIC "hourly_analytics"
EXPORT_SCHEDULE "0 * * * *"
```

## Troubleshooting

### Issue: Sink not executing

**Symptoms:**
- No jobs in `jobs_log`
- Scheduled Sink not running

**Solutions:**
1. Verify schedule syntax is correct (cron format)
2. Check Sink pipe is deployed: `tb pipe ls`
3. Test connection: `tb connection data <connection_name>` (validates connection details; note this command is for the Kafka source connector and tests consumption, while Sinks produce data)
4. Check for errors in `jobs_log`

### Issue: Messages not appearing in Kafka

**Symptoms:**
- Sink executes successfully
- No messages in Kafka topic

**Solutions:**
1. Verify topic name is correct
2. Check Kafka connection credentials
3. Verify topic exists in Kafka cluster
4. Check Kafka producer permissions
5. Review `sinks_ops_log` for errors

### Issue: Slow Sink execution

**Symptoms:**
- Long execution times
- Timeouts

**Solutions:**
1. Optimize Sink pipe query
2. Reduce query result size
3. Add filters to limit data
4. Check network latency to Kafka
5. Monitor Kafka cluster performance

### Monitoring Sink operations

**Check Sink execution status:**

```sql
SELECT
    timestamp,
    pipe_name,
    status,
    elapsed_time,
    rows_written,
    error
FROM tinybird.jobs_log
WHERE job_type = 'sink'
  AND timestamp > now() - INTERVAL 24 hour
ORDER BY timestamp DESC
```

**Check Sink-specific metrics:**

```sql
SELECT
    timestamp,
    pipe_name,
    topic,
    messages_sent,
    bytes_sent,
    error
FROM tinybird.sinks_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
ORDER BY timestamp DESC
```

## Next steps

- Get familiar with the [Service Data Source](/forward/monitoring/service-datasources) and see what's going on in your account
- Deep dive on Tinybird's [pipes concept](/forward/work-with-data/pipes)
- Review [Kafka connector documentation](/forward/get-data-in/connectors/kafka) for Kafka setup
- Learn about [monitoring Sink operations](/forward/monitoring/service-datasources#tinybird-sinks-ops-log)