---
title: Kafka connector
meta:
   description: Complete guide to setting up and configuring the Kafka connector for Tinybird. Connect to Kafka, Confluent Cloud, AWS MSK, Redpanda, and other Kafka-compatible platforms. Includes authentication, troubleshooting, and performance optimization.
---

# Kafka connector

You can set up a Kafka connector to consume data from a Kafka topic and store it in Tinybird by creating a [.connection](/forward/dev-reference/datafiles/connection-files) and [.datasource](/forward/dev-reference/datafiles/datasource-files) file. Use `tb datasource create --kafka` command for a guided the process.

## Set up the connector

To set up the Kafka connector, follow these steps.

{% steps %}

### Create a Kafka connection

You can create a Kafka connection in Tinybird using either the CLI or by manually creating a connection file.

#### Option 1: Use the CLI (recommended)

Run the following command to create a connection:

```bash
tb connection create kafka
```

You are prompted to enter:

1. A name for your connection.
2. The bootstrap server
3. The Kafka key
4. The Kafka secret

{% callout type="info" %}
If you need to add `KAFKA_SCHEMA_REGISTRY_URL` or any of the [Kafka .connection settings](#kafka-connection-settings), edit the .connection file manually.
{% /callout %}

#### Option 2: Manually create a connection file

Create a [.connection file](/forward/dev-reference/datafiles/connection-files) with the required credentials stored in secrets. Check [Kafka .connection settings](#kafka-connection-settings) for a complete list of Kafka connection settings.

{% callout type="info" %}
Secrets are only replaced in your resources when you deploy. If you change a secret, you need to
deploy for the changes to take effect.
{% /callout %}

#### Authentication methods

Tinybird supports multiple authentication methods for Kafka connections:

##### SASL/PLAIN

The most common authentication method for cloud Kafka providers like Confluent Cloud.

```tb {% title="kafka_sasl_plain.connection" %}
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS <BOOTSTRAP_SERVERS:PORT>
KAFKA_SECURITY_PROTOCOL SASL_SSL
KAFKA_SASL_MECHANISM PLAIN
KAFKA_KEY {{ tb_secret("KAFKA_KEY", "key") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET", "secret") }}
```

Set the kafka key and secret using [tb secret](/forward/dev-reference/commands/tb-secret):

```bash
tb [--cloud] secret set KAFKA_KEY <KAFKA_KEY>
tb [--cloud] secret set KAFKA_SECRET <KAFKA_SECRET>
```

##### SASL/SCRAM-SHA-256 or SCRAM-SHA-512

For Kafka clusters using SCRAM authentication:

```tb {% title="kafka_sasl_scram.connection" %}
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS <BOOTSTRAP_SERVERS:PORT>
KAFKA_SECURITY_PROTOCOL SASL_SSL
KAFKA_SASL_MECHANISM SCRAM-SHA-256
KAFKA_KEY {{ tb_secret("KAFKA_KEY", "key") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET", "secret") }}
```

Replace `SCRAM-SHA-256` with `SCRAM-SHA-512` if your cluster uses SHA-512.

##### SASL/OAuthBearer (AWS MSK)

For AWS MSK clusters using IAM authentication:

```tb {% title="kafka_sasl_oauthbearer_aws.connection" %}
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS <BOOTSTRAP_SERVERS:PORT>
KAFKA_SECURITY_PROTOCOL SASL_SSL
KAFKA_SASL_MECHANISM OAUTHBEARER
KAFKA_SASL_OAUTHBEARER_METHOD AWS
KAFKA_SASL_OAUTHBEARER_AWS_REGION <AWS_REGION>
KAFKA_SASL_OAUTHBEARER_AWS_ROLE_ARN {{ tb_secret("AWS_ROLE_ARN") }}
KAFKA_SASL_OAUTHBEARER_AWS_EXTERNAL_ID <AWS_EXTERNAL_ID>
```

When using AWS method, you need to set up the required AWS IAM role with appropriate permissions. See the [AWS IAM permissions](#aws-iam-permissions) section for details on the required access policy and trust policy configurations.

Set the AWS role ARN using [tb secret](/forward/dev-reference/commands/tb-secret):

```bash
tb [--cloud] secret set AWS_ROLE_ARN <AWS_ROLE_ARN>
```

For detailed setup instructions for specific vendors, see:
- [Confluent Cloud setup guide](kafka/guides/confluent-cloud-setup)
- [AWS MSK setup guide](kafka/guides/aws-msk-setup)
- [Redpanda setup guide](kafka/guides/redpanda-setup)

##### PLAINTEXT (local development only)

For local development with unsecured Kafka:

```tb {% title="kafka_plaintext.connection" %}
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS localhost:9092
KAFKA_SECURITY_PROTOCOL PLAINTEXT
```

{% callout type="warning" %}
PLAINTEXT connections are only suitable for local development. Production Kafka clusters should use SASL_SSL.
{% /callout %}

For more information on local development setup, see the [local development guide](kafka/guides/local-development).

### Bootstrap servers configuration

The `KAFKA_BOOTSTRAP_SERVERS` setting specifies one or more Kafka brokers to connect to. Use comma-separated values for multiple brokers:

```tb
KAFKA_BOOTSTRAP_SERVERS broker1:9092,broker2:9092,broker3:9092
```

**Important considerations:**
- Use the advertised listeners address, not the internal broker address
- For cloud providers, use the public endpoint provided by your Kafka service
- Ensure the port matches your security protocol (9092 for PLAINTEXT, 9093-9096 for SASL_SSL depending on provider)
- For AWS MSK, use the bootstrap broker string from the MSK console

If you encounter connection issues, see the [troubleshooting guide](kafka/troubleshooting#error-connection-timeout-or-broker-unreachable) for bootstrap server configuration help.

### SSL/TLS certificate configuration

When using `SASL_SSL` security protocol, you may need to provide a CA certificate, especially for:
- Self-signed certificates
- Private CA certificates
- Aiven Kafka (Private CA port)

The recommended way to provide the certificate is using secrets.

Create the multiline secret with the certificate using [tb secret](/forward/dev-reference/commands/tb-secret#tb-secret-set). This opens an editor
to introduce the value.

```bash
tb [--cloud] secret set --multiline KAFKA_PROD_SSL_CA_PEM
```

Use the secret in the `KAFKA_SSL_CA_PEM` setting of the connection file. Take into account this is a [multiline setting](/forward/dev-reference/datafiles#multiple-lines).

```tb {% title="/connections/kafka_ssl_ca_pem.connection" %}
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS {{ tb_secret("KAFKA_PROD_SERVER", "kafka:29092") }}
KAFKA_SECURITY_PROTOCOL {{ tb_secret("KAFKA_PROD_SECURITY_PROTOCOL", "PLAINTEXT") }}
KAFKA_SASL_MECHANISM {{ tb_secret("KAFKA_PROD_SASL_MECHANISM", "PLAIN") }}
KAFKA_KEY {{ tb_secret("KAFKA_PROD_KEY", "key") }}
KAFKA_SECRET {{ tb_secret("KAFKA_PROD_SECRET", "secret") }}
KAFKA_SSL_CA_PEM >
  {{ tb_secret("KAFKA_PROD_SSL_CA_PEM") }}
```

**Certificate format requirements:**
- Certificate must be in PEM format (starts with `-----BEGIN CERTIFICATE-----`)
- Include the full certificate chain if required
- For Aiven Kafka, download the CA certificate from the Aiven console

For troubleshooting SSL certificate issues, see the [troubleshooting guide](kafka/troubleshooting#error-ssltls-certificate-validation-failed).

### Create a Kafka data source

Create a .datasource file using `tb datasource create --kafka` or manually.

Define the data source schema as with any non-Kafka datasource and specify the required Kafka
settings. The value of `KAFKA_CONNECTION_NAME` must match the name of the .connection file
you created in the previous step.

The default .datasource stores the whole message in a column called `data`. Then, you can use [JSONExtract functions](/sql-reference/functions/json-functions#jsonextract-functions) to access the message fields, either at query time or using materialized views.

```tb {% title="kafka_default.datasource" %}
SCHEMA >
    `data` String `json:$`

KAFKA_CONNECTION_NAME kafka_connection # The name of the .connection file
KAFKA_TOPIC topic_name
KAFKA_GROUP_ID {{ tb_secret("KAFKA_GROUP_ID") }}
```

You can always use [JSONPaths](/forward/dev-reference/datafiles/datasource-files#jsonpath-expressions) syntax to extract the message fields into separate columns at ingest time.

```tb {% title="kafka_sample.datasource" %}
SCHEMA >
   `timestamp` DateTime(3) `json:$.timestamp`,
   `session_id` String `json:$.session_id`,
   `action` LowCardinality(String) `json:$.action`,
   `version` LowCardinality(String) `json:$.version`,
   `payload` String `json:$.payload`,
   `data` String `json:$`

KAFKA_CONNECTION_NAME kafka_sample # The name of the .connection file
KAFKA_TOPIC test_topic
KAFKA_GROUP_ID {{ tb_secret("KAFKA_GROUP_ID") }}
```

In addition to the columns specified in `SCHEMA`, Kafka data sources have additional columns
that store metadata of the messages ingested. See [Kafka meta columns](#kafka-meta-columns) for more
information.

For a complete list of Kafka data source settings, see [Kafka .datasource settings](#kafka-datasource-settings).

{% callout type="caution" %}
Use different consumer group values for `KAFKA_GROUP_ID` at different environments to isolate consumers and their committed offset.
{% /callout %}

{% callout type="info" %}
If you use [Kafka Tombstones](https://docs.confluent.io/kafka/design/log_compaction.html#compaction-enables-deletes), Tinybird sends these `null`-payload messages to the Quarantine Data Source if your Data Source's schema can't accept them. You can inspect the tombstone records without affecting your main Data Source.
{% /callout %}

### Connectivity check

After defining your Kafka data source and connection, test the connection and preview data:

```bash
tb connection data <connection_name>
```

This command prompts you to:
1. Select a Kafka topic
2. Enter a consumer group ID
3. Returns preview data from the topic

This validates that Tinybird can connect to your Kafka broker, authenticate, and consume messages. Remember to set any secrets used by the connection.

{% /steps %}

## Compatibility

The connector is compatible with Apache Kafka and works with any compatible implementation and vendor. The following are tried and tested:

- Apache Kafka
- Confluent Platform and Confluent Cloud Platform
- Redpanda
- AWS MSK
- Azure Event Hubs for Apache Kafka
- Estuary

For vendor-specific setup guides, see:
- [Confluent Cloud setup](kafka/guides/confluent-cloud-setup)
- [AWS MSK setup](kafka/guides/aws-msk-setup)
- [Redpanda setup](kafka/guides/redpanda-setup)

## Schema management and evolution

Tinybird supports multiple serialization formats for Kafka messages:

- **JSON** (`json_without_schema` or `json_with_schema`)
- **Avro** (requires Schema Registry)
- **JSON Schema** (requires Schema Registry)

When using Schema Registry, Tinybird automatically handles schema evolution for backward-compatible changes. For detailed information on schema management, including adding fields, handling nullable types, and data type mapping, see the [schema management guide](kafka/guides/schema-management).

### Message size limits

Tinybird has a default message size limit of 10 MB. Messages exceeding this limit are sent to the Quarantine Data Source. For strategies on handling large messages and troubleshooting quarantined messages, see the [message size handling guide](kafka/guides/message-size-handling).

## Kafka .datasource settings

{% table %}
   * Instruction
   * Required
   * Description
   ---
   * `KAFKA_CONNECTION_NAME`
   * Yes
   * Name of the configured Kafka connection in Tinybird. It must match the name of the
     connection file (without the extension).
   ---
   * `KAFKA_TOPIC`
   * Yes
   * Name of the Kafka topic to consume from.
   ---
   * `KAFKA_GROUP_ID`
   * Yes
   * Consumer Group ID to use when consuming from Kafka.
   ---
   * `KAFKA_AUTO_OFFSET_RESET`
   * No
   * Offset to use when no previous offset can be found, like when creating a new consumer. Supported values are `latest` and `earliest`. Default: `latest`.
   ---
   * `KAFKA_STORE_HEADERS`
   * No
   * Adds a `__headers Map(String, String)` column to the data source, and stores Kafka headers in it for later processing. Default value is `False`.
   ---
   * `KAFKA_STORE_RAW_VALUE`
   * No
   * Stores the raw message in its entirety in the `__value` column. Default: `False`.
   ---
   * `KAFKA_KEY_FORMAT`
   * No
   * Format of the message key. Valid values are `avro`, `json_with_schema`, and `json_without_schema`. Using `avro` or `json_with_schema` requires `KAFKA_SCHEMA_REGISTRY_URL` to be set in the connection file used by the data source.
   ---
   * `KAFKA_VALUE_FORMAT`
   * No
   * Format of the message value. Valid values are `avro`, `json_with_schema`, and `json_without_schema`. Using `avro` or `json_with_schema` requires `KAFKA_SCHEMA_REGISTRY_URL` to be set in the connection file used by the data source.
{% /table %}

## Kafka .connection settings

{% table %}
   * Instruction
   * Required
   * Description
   ---
   * `KAFKA_BOOTSTRAP_SERVERS`
   * Yes
   * Comma-separated list of one or more Kafka brokers, including Port numbers.
   ---
   * `KAFKA_SECURITY_PROTOCOL`
   * Yes
   * Security protocol for the connection. Accepted values are `PLAINTEXT` and `SASL_SSL`. Default value is `SASL_SSL`.
   ---
   * `KAFKA_SASL_MECHANISM`
   * No
   * SASL mechanism to use for authentication. Supported values are `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, `OAUTHBREARER`. Default value is `PLAIN`.
   ---
   * `KAFKA_KEY`
   * When using SASL `PLAIN` or `SCRAM`
   * Sometimes called Key, Client Key, or Username depending on the Kafka distribution.
   ---
   * `KAFKA_SECRET`
   * When using SASL `PLAIN` or `SCRAM`
   * Sometimes called Secret, Secret Key, or Password depending on the Kafka distribution.
   ---
   * `KAFKA_SASL_OAUTHBEARER_METHOD`
   * When using SASL `OAUTHBEARER`
   * Currently only `AWS` is supported.
   ---
   * `KAFKA_SASL_OAUTHBEARER_AWS_REGION`
   *  When using SASL `OAUTHBEARER`
   * AWS MSK cluster region.
   ---
   * `KAFKA_SASL_OAUTHBEARER_AWS_ROLE_ARN`
   *  When using SASL `OAUTHBEARER`
   * AWS role ARN to assume.
   ---
   * `KAFKA_SASL_OAUTHBEARER_AWS_EXTERNAL_ID`
   *  When using SASL `OAUTHBEARER`
   * External ID used when assuming the role.
   ---
   * `KAFKA_SCHEMA_REGISTRY_URL`
   * No
   * URL of the Kafka schema registry. Used for `avro` and `json_with_schema` deserialization of
     keys and values. If Basic Auth is required, include credentials in the URL format:
     `https://<username>:<password>@<registry_host>`
   ---
   * `KAFKA_SSL_CA_PEM`
   * No
   * Content of the CA certificate in PEM format for SSL connections. [Multiline setting](/forward/dev-reference/datafiles#multiple-lines)
{% /table %}

## AWS IAM permissions

To grant Tinybird access to your Kafka cluster, you must create an AWS IAM role with two policies: an access policy and a trust policy.

### Manual configuration
{% tabs variant="code" initial="AWS Access Policy" %}
{% tab label="AWS Access Policy" %}
```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": "arn:aws:kafka:<MSK_CLUSTER_REGION>:<MSK_CLUSTER_ACCOUNT_ID>:cluster/<MSK_CLUSTER_NAME>/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:CreateTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": "arn:aws:kafka:<MSK_CLUSTER_REGION>:<MSK_CLUSTER_ACCOUNT_ID>:topic/<MSK_CLUSTER_NAME>/*/<KAFKA_TOPIC>"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": "arn:aws:kafka:<MSK_CLUSTER_REGION>:<MSK_CLUSTER_ACCOUNT_ID>:group/<MSK_CLUSTER_NAME>/*/<KAFKA_GROUP>"
        }
    ]
}
```
{% /tab %}
{% tab label="AWS Trust Policy" %}
```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "sts:AssumeRole",
            "Principal": {
                "AWS": "arn:aws:iam::<TINYBIRD_ACCOUNT_ID>:root"
            },
            "Condition": {
                "StringEquals": {
                    "sts:ExternalId": "<EXTERNAL_ID>"
                }
            }
        }
    ]
}
```
{% /tab %}
- `<MSK_CLUSTER_REGION>`: AWS region where the MSK cluster is located (for example, `us-east-1`).
- `<MSK_CLUSTER_ACCOUNT_ID>`: AWS account identifier of the MSK cluster.
- `<MSK_CLUSTER_NAME>`: Name of the MSK cluster (for example, `demo-cluster-1`).
- `<KAFKA_TOPIC>`: Name of the Kafka topic to give read access. Use `*` to allow all topics.
- `<KAFKA_GROUP>`: Name of the consumer group to give read access. Use `*` to allow all groups.
- `<TINYBIRD_ACCOUNT_ID>`:
  - For Tinybird Cloud environments: Tinybird's AWS account ID, which varies depending on your region and provider
  - For Local environments: The AWS account ID of the credentials you pass to the Docker container with `--use-aws-creds`
- `<EXTERNAL_ID>`: A unique identifier for your connection.

### Get the configuration from the API
Check [current Tinybird regions](/api-reference#current-tinybird-regions) to see the available regions and their corresponding API endpoints.

- Access Policy: `/v0/integrations/kafka/policies/read-access-policy`
  - Use the `msk_cluster_arn` paramter to limit the access to only one MSK cluster.
  - Use the `topics` parameter to limit the access to a list topics (comma separated).
  - Use the `groups` parameter to limit the access to a list of consumer groups (comma separated).
- Trust Policy: `/v0/integrations/kafka/policies/trust-policy`
  - Use the connection name in the `external_id_seed` parameter

{% /tabs %}

## Kafka connector in the local environment

You can use the Kafka connector in the [Tinybird Local container](/forward/install-tinybird/local) to consume messages from a local Kafka server or a Kafka server in the cloud.

For detailed local development setup instructions, including Docker Compose examples and environment management, see the [local development guide](kafka/guides/local-development).

### Local Kafka server with Docker Compose

When using a local Kafka server, ensure the Tinybird Local container can access it. If you are running Kafka using Docker, Docker Compose is the best option to set up both Kafka and Tinybird Local in the same network. Here's an example using `apache/kafka`:

```yaml
networks:
  kafka_network:
    driver: bridge

volumes:
  kafka-data:

services:

  tinybird-local:
    image: tinybirdco/tinybird-local:latest
    container_name: tinybird-local
    platform: linux/amd64
    ports:
      - "7181:7181"
    networks:
      - kafka_network

  kafka:
    image: apache/kafka:latest
    hostname: broker
    container_name: broker
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@broker:29093"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    volumes:
      - kafka-data:/var/lib/kafka/data
    networks:
      - kafka_network
```

### Network configuration

The key points about the network configuration:

1. The example uses a bridge network (`kafka_network`) to allow communication between containers
2. The Kafka service exposes ports for both internal container communication and external access
3. Tinybird Local connects to Kafka using the internal network address
4. The bootstrap servers address in your Kafka Connection should match the `KAFKA_ADVERTISED_LISTENERS` in the `docker-compose.yml` file (for example, `kafka:29092`)

### Creating the Kafka connection and data source

The following examples use default values in the `tb_secret()` function, which are suitable for this local setup. When deploying to Tinybird Cloud, you set these secrets in the Tinybird Cloud environment instead.

Connection file `/connections/kafka_conn.connection`

```tb {% title="/connections/kafka_conn.connection" %}
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS {{ tb_secret("KAFKA_PROD_SERVER", "kafka:29092") }}
KAFKA_SECURITY_PROTOCOL {{ tb_secret("KAFKA_PROD_SECURITY_PROTOCOL", "PLAINTEXT") }}
KAFKA_SASL_MECHANISM {{ tb_secret("KAFKA_PROD_SASL_MECHANISM", "PLAIN") }}
KAFKA_KEY {{ tb_secret("KAFKA_KEY", "key") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET", "secret") }}
```

A Schemaless kafka data source file `/datasources/kafka_ds.datasource`

```tb {% title="datasources/kafka_ds.datasource" %}
SCHEMA >
    `data` String `json:$`

KAFKA_CONNECTION_NAME kafka_conn
KAFKA_TOPIC sample-topic
KAFKA_GROUP_ID my_group_id
```

### Usage example

{% steps %}

#### Start the Docker containers

```bash
docker compose up
```

#### Create the `sample-topic`

```bash
docker exec -it broker /opt/kafka/bin/kafka-topics.sh --create --topic sample-topic --bootstrap-server localhost:9092

# Created topic sample-topic.
```

#### Deploy the project

```bash
tb deploy
# Running against Tinybird Local

# * Changes to be deployed:
# ------------------------------------------------------------------------
# | status | name       | type       | path                              |
# ------------------------------------------------------------------------
# | new    | kafka_ds   | datasource | datasources/kafka_ds.datasource   |
# | new    | kafka_conn | connection | connections/kafka_conn.connection |
# ------------------------------------------------------------------------
# * No changes in tokens to be deployed

# Deployment URL: http://cloud.tinybird.co/local/7181/None/deployments/1

# * Deployment submitted
# » Waiting for deployment to be ready...
# ✓ Deployment is ready
# » Removing old deployment
# ✓ Old deployment removed
# » Waiting for deployment to be promoted...
# ✓ Deployment #1 is live!
# A deployment with no data is useless. Learn how to ingest at https://www.tinybird.co/docs/forward/get-data-in
```

#### Send data to the topic and query it

```bash
echo '{"data": "test"}' | docker exec -i broker /opt/kafka/bin/kafka-console-producer.sh --topic sample-topic --bootstrap-server localhost:9092

tb sql "select * from kafka_ds"
# Running against Tinybird Local
#   data               __value   __topic                  __partition   __offset   __timestamp           __key
#   String             String    LowCardinality(String)         Int16      Int64   DateTime              String
# ───────────────────────────────────────────────────────────────────────────────────────────────────────────────
#   {"data": "test"}             sample-topic                       0          1   2025-06-20 12:17:49
```

{% /steps %}

### Docker Compose troubleshooting

If you encounter connection issues:

1. Ensure all containers are running: `docker-compose ps`
2. Check container logs: `docker-compose logs kafka`
3. Ensure the bootstrap servers address in your Connection file matches the `KAFKA_ADVERTISED_LISTENERS` value in your `docker-compose.yml` file.

## Kafka meta columns

When you connect a data source to Kafka, the following columns are added to store metadata from Kafka messages:

{% table %}
   * name
   * type
   * description
   ---
   * `__value `
   * `String`
   * A String representing the entire unparsed value of the Kafka message. It is only populated if `KAFKA_STORE_RAW_VALUE` is set to `True`.
   ---
   * `__topic `
   * `LowCardinality(String)`
   * The topic that the message was read from.
   ---
   * `__partition `
   * `Int16`
   * The partition that the message was read from.
   ---
   * `__offset `
   * `Int16`
   * The offset of the message.
   ---
   * `__timestamp `
   * `Datetime`
   * The timestamp of the message.
   ---
   * `__key `
   * `String`
   * The key of the message.
{% /table %}


Optionally, when `KAFKA_STORE_HEADERS` is set to `True`, the following column is added and populated:
{% table %}
   * name
   * type
   * description
   ---
   * `__headers`
   * `Map(String, String)`
   * Kafka headers of the message.
{% /table %}

{% callout type="info" %}
When you iterate your Kafka data source, you might need to use the meta columns in the
[FORWARD_QUERY](/forward/test-and-deploy/evolve-data-source#forward-query). Tinybird suggests a valid forward query that you can tweak to get the desired values for each column.
{% /callout %}

## Kafka logs

{% snippet title="kafkalogs" /%}

For comprehensive monitoring of your Kafka connectors, including lag tracking, throughput analysis, and error monitoring, see [Monitor Kafka connectors](/forward/monitoring/kafka-clickhouse-monitoring).

## Pause and resume ingestion

In branches and Tinybird Local, you can pause and resume Kafka ingestion using the CLI:

```bash
tb [--branch=BRANCH_NAME] datasource stop my_kafka_datasource
tb [--branch=BRANCH_NAME] datasource start my_kafka_datasource
```

Stopping a Data Source pauses ingestion from the Kafka topic. When you start it again, the behavior depends on the environment. In branches, a new consumer group is created and ingestion starts from the latest offset. In Tinybird Local, ingestion resumes from the last committed offset.

{% callout type="info" %}
Because each start creates a new consumer group, previous consumer groups become orphaned. Depending on your Kafka cluster's consumer group TTL (`offsets.retention.minutes`), these orphan groups may persist until they expire.
{% /callout %}

See [tb datasource start](/forward/dev-reference/commands/tb-datasource#tb-datasource-start) and [tb datasource stop](/forward/dev-reference/commands/tb-datasource#tb-datasource-stop) for more details.

## Troubleshooting

For comprehensive troubleshooting guidance, see the [Kafka connector troubleshooting guide](kafka/troubleshooting).

Common issues include:
- Connection timeout or broker unreachable
- Authentication failures
- SSL/TLS certificate validation errors
- Deserialization errors (Avro, JSON)
- Offset and consumer group conflicts
- Consumer lag issues
- Schema mismatches

Each combination of `KAFKA_TOPIC` and `KAFKA_GROUP_ID` can only be used in one data source,
otherwise the offsets committed by the consumers of different data sources clash.

If you connect a data source to Kafka using a `KAFKA_TOPIC` and `KAFKA_GROUP_ID` that were
previously used by another data source in your workspace, the data source only receives data
from the last committed offset, even if `KAFKA_AUTO_OFFSET_RESET` is set to `earliest`.

To prevent these issues, always use unique `KAFKA_GROUP_ID`s when testing Kafka data sources.

See [Kafka logs](#kafka-logs) to learn how to diagnose any other issues

### Compressed messages

Tinybird can consume from Kafka topics where Kafka compression is turned on; decompressing the
message is a standard function of the Kafka consumer. However, if you compressed the message before passing it through the Kafka producer, Tinybird can't do post-consumer processing to decompress the message.

For example, if you compressed a JSON message through gzip and produced it to a Kafka topic as a
`bytes` message, it would be ingested by Tinybird as `bytes`. If you produced a JSON message to a
Kafka topic with the Kafka producer setting `compression.type=gzip`, while it would be stored in
Kafka as compressed bytes, it would be decoded on ingestion and arrive to Tinybird as JSON.

## Connecting an existing data source to Kafka

You can connect an existing, default data source to Kafka.

Create the Kafka .connection file if it does not exist, add the desired Kafka settings to the .datasource file, and add a [`FORWARD_QUERY`](/forward/test-and-deploy) to provide default values for the [Kafka meta columns](#kafka-meta-columns).

## Disconnecting a data source from Kafka

To disconnect a data source from Kafka, remove the Kafka settings from the .datasource file.

If you want to keep any of the [Kafka meta columns](#kafka-meta-columns), add them to the schema with a default value and adjust the `FORWARD_QUERY` accordingly.

## Additional resources

### Guides

- [Confluent Cloud setup](kafka/guides/confluent-cloud-setup) - End-to-end setup for Confluent Cloud
- [AWS MSK setup](kafka/guides/aws-msk-setup) - Complete MSK configuration guide
- [Redpanda setup](kafka/guides/redpanda-setup) - Redpanda-specific configuration
- [Local development](kafka/guides/local-development) - Local development setup and testing
- [CI/CD and version control](kafka/guides/cicd-version-control) - Managing connections across environments
- [Schema management](kafka/guides/schema-management) - Schema evolution and data type mapping
- [Partitioning strategies](kafka/guides/partitioning-strategies) - Best practices for Kafka partitioning
- [Message size handling](kafka/guides/message-size-handling) - Handling large messages and troubleshooting
- [Performance optimization](kafka/guides/performance-optimization) - Throughput optimization and tuning

### Reference

- [Troubleshooting guide](kafka/troubleshooting) - Comprehensive error resolution with error lookup table
- [Limits and quotas](kafka/limits) - Kafka connector limits and how to request increases

### Related documentation

- [Monitor Kafka connectors](/forward/monitoring/kafka-clickhouse-monitoring) - Monitoring queries and alerts
- [Kafka Sink](/forward/work-with-data/publish-data/sinks/kafka-sink) - Exporting data to Kafka
- [Quarantine Data Sources](/forward/get-data-in/quarantine) - Handling failed messages
