The Confluent Connector allows you to ingest data from your existing Confluent Cloud cluster and load it into Tinybird.
The Confluent Connector is fully managed and requires no additional tooling. Connect Tinybird to your Confluent Cloud cluster, choose a topic, and Tinybird will automatically begin consuming messages from Confluent Cloud.
Using .datasource files¶
If you are managing your Tinybird resources in files, there are several settings available to configure the Confluent Connector in
KAFKA_CONNECTION_NAME: The name of the configured Confluent Cloud connection in Tinybird
KAFKA_BOOTSTRAP_SERVERS: The comma-separated list of bootstrap servers (including Port numbers)
KAFKA_KEY: The Key component of the Confluent Cloud API Key
KAFKA_SECRET: The Secret component of the Confluent Cloud API Key
KAFKA_TOPIC: The name of the Kafka topic to consume from
KAFKA_GROUP_ID: The Kafka Consumer Group ID to use when consuming from Confluent Cloud
KAFKA_AUTO_OFFSET_RESET: The offset to use when no previous offset can be found, e.g. when creating a new consumer. Supported values:
KAFKA_STORE_RAW_VALUE: Optionally, you can store the raw message in its entirety as an additional column. Supported values:
For example, to define Data Source with a new Confluent Cloud connection in a
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME my_connection_name KAFKA_BOOTSTRAP_SERVERS my_server:9092 KAFKA_KEY my_username KAFKA_SECRET my_password KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
Or, to define Data Source that uses an exsting Confluent Cloud connection:
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME my_connection_name KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
Using INCLUDE to store connection settings¶
To avoid configuring the same connection settings across many files, or to prevent leaking sensitive information, you can store connection details in an external file and use
INCLUDE to import them into one or more
You can find more information about
INCLUDE in the Advanced Templates documentation.
As an example, you may have two Confluent Cloud
.datasource files, which re-use the same Confluent Cloud connection. You can create an include file which stores the Confluent Cloud connection details.
The Tinybird project may use the following structure:
ecommerce_data_project/ datasources/ connections/ my_connector_name.incl my_confluent_datasource.datasource another_datasource.datasource endpoints/ pipes/
Where the file
my_connector_name.incl has the following content:
KAFKA_CONNECTION_NAME my_connection_name KAFKA_BOOTSTRAP_SERVERS my_server:9092 KAFKA_KEY my_username KAFKA_SECRET my_password
And the Confluent Cloud
.datasource files look like the following:
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" INCLUDE "connections/my_connection_name.incl" KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
tb pull to pull a Confluent Cloud Data Source using the CLI, the
KAFKA_SECRET settings will not be included in the file to avoid exposing credentials.
Is the Confluent Cloud Schema Registry supported?¶
Yes, for decoding Avro messages. You can choose to enable Schema Registry support when connecting Tinybird to Confluent Cloud. You will be prompted to add your Schema Registry connection details, e.g.
However, the Confluent Cloud Data Source schema will not be defined using the Schema Registry, the Schema Registry is simply used to decode the messages.
Can Tinybird ingest compressed messages?¶
Yes, Tinybird can consume from Kafka topics where Kafka compression has been enabled, as decompressing the message is a standard function of the Kafka Consumer.
However, if you compressed the message before passing it through the Kafka Producer, then Tinybird cannot do post-Consumer processing to decompress the message.
For example, if you compressed a JSON message through gzip and produced it to a Kafka topic as a
bytes message, it would be ingested by Tinybird as
bytes. If you produced a JSON message to a Kafka topic with the Kafka Producer setting compression.type=gzip, while it would be stored in Kafka as compressed bytes, it would be decoded on ingestion and arrive to Tinybird as JSON.