Kafka Connector

The Kafka Connector allows you to ingest data from your existing Kafka cluster and load it into Tinybird.

The Kafka Connector is fully managed and requires no additional tooling. Connect Tinybird to your Kafka cluster, choose a topic, and Tinybird will automatically begin consuming messages from Kafka.

Note that you need to grant READ permissions to both the Topic and the Consumer Group to ingest data from Kafka into Tinybird.

Using .datasource files

If you are managing your Tinybird resources in files, there are several settings available to configure the Kafka Connector in .datasource files.

  • KAFKA_CONNECTION_NAME: The name of the configured Kafka connection in Tinybird

  • KAFKA_BOOTSTRAP_SERVERS: A comma-separated list of one or more Kafka brokers (including Port numbers)

  • KAFKA_KEY: The key used to authenticate with Kafka, sometimes called Key, Client Key, or Username, depending on the Kafka distribution

  • KAFKA_SECRET: The secret used to authenticate with Kafka, sometimes called Secret, Secret Key, or Password, depending on the Kafka distribution

  • KAFKA_TOPIC: The name of the Kafka topic to consume from

  • KAFKA_GROUP_ID: The Kafka Consumer Group ID to use when consuming from Kafka

  • KAFKA_AUTO_OFFSET_RESET: The offset to use when no previous offset can be found, e.g. when creating a new consumer. Supported values: latest, earliest. Default: latest.

  • KAFKA_STORE_RAW_VALUE: Optionally, you can store the raw message in its entirety as an additional column. Supported values: 'True', 'False'. Default: 'False'

For example, to define Data Source with a new Kafka connection in a .datasource file:

Data Source with a new Kafka connection
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

Or, to define Data Source that uses an existing Kafka connection:

Data Source with an existing Kafka connection
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

KAFKA_CONNECTION_NAME my_connection_name
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

Using INCLUDE to store connection settings

To avoid configuring the same connection settings across many files, or to prevent leaking sensitive information, you can store connection details in an external file and use INCLUDE to import them into one or more .datasource files.

You can find more information about INCLUDE in the Advanced Templates documentation.

As an example, you may have two Kafka .datasource files, which re-use the same Kafka connection. You can create an include file which stores the Kafka connection details.

The Tinybird project may use the following structure:

Tinybird data project file structure
ecommerce_data_project/
    datasources/
        connections/
          my_connector_name.incl
        my_kafka_datasource.datasource
        another_datasource.datasource
    endpoints/
    pipes/

Where the file my_connector_name.incl has the following content:

Include file containing Kafka connection details
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password

And the Kafka .datasource files look like the following:

Data Source using includes for Kafka connection details
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

INCLUDE "connections/my_connection_name.incl"

KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

When using tb pull to pull a Kafka Data Source using the CLI, the KAFKA_KEY and KAFKA_SECRET settings will not be included in the file to avoid exposing credentials.

FAQs

Is the Kafka Schema Registry supported?

Yes, for decoding Avro messages. You can choose to enable Schema Registry support when connecting Tinybird to Kafka. You will be prompted to add your Schema Registry connection details, e.g. https://<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>@<SCHEMA_REGISTRY_API_URL>. However, the Kafka Data Source schema will not be defined using the Schema Registry, the Schema Registry is simply used to decode the messages.

Can Tinybird ingest compressed messages?

Yes, Tinybird can consume from Kafka topics where Kafka compression has been enabled, as decompressing the message is a standard function of the Kafka Consumer.

However, if you compressed the message before passing it through the Kafka Producer, then Tinybird cannot do post-Consumer processing to decompress the message.

For example, if you compressed a JSON message through gzip and produced it to a Kafka topic as a bytes message, it would be ingested by Tinybird as bytes. If you produced a JSON message to a Kafka topic with the Kafka Producer setting compression.type=gzip, while it would be stored in Kafka as compressed bytes, it would be decoded on ingestion and arrive to Tinybird as JSON.