- Instruction
- Required
- Description
KAFKA_CONNECTION_NAME
- Yes
- The name of the configured Kafka connection in Tinybird.
KAFKA_BOOTSTRAP_SERVERS
- Yes
- Comma-separated list of one or more Kafka brokers, including Port numbers.
KAFKA_KEY
- Yes
- Key used to authenticate with Kafka. Sometimes called Key, Client Key, or Username, depending on the Kafka distribution.
KAFKA_SECRET
- Yes
- Secret used to authenticate with Kafka. Sometimes called Secret, Secret Key, or Password, depending on the Kafka distribution.
KAFKA_TOPIC
- Yes
- Name of the Kafka topic to consume from.
KAFKA_GROUP_ID
- Yes
- Consumer Group ID to use when consuming from Kafka.
KAFKA_AUTO_OFFSET_RESET
- No
- Offset to use when no previous offset can be found, for example when creating a new consumer. Supported values are
latest
,earliest
. Default:latest
.
KAFKA_STORE_HEADERS
- No
- Store Kafka headers as field
__headers
for later processing. Default value is'False'
.
KAFKA_STORE_BINARY_HEADERS
- No
- Stores all Kafka headers as binary data in field
__headers
as a binary map of typeMap(String, String)
. To access the header'key'
run:__headers['key']
. Default value is'True'
. This field only applies ifKAFKA_STORE_HEADERS
is set toTrue
.
KAFKA_STORE_RAW_VALUE
- No
- Stores the raw message in its entirety as an additional column. Supported values are
'True'
,'False'
. Default:'False'
.
KAFKA_SCHEMA_REGISTRY_URL
- No
- URL of the Kafka schema registry.
KAFKA_TARGET_PARTITIONS
- No
- Target partitions to place the messages.
KAFKA_KEY_AVRO_DESERIALIZATION
- No
- Key for decoding Avro messages.
KAFKA_SSL_CA_PEM
- No
- CA certificate in PEM format for SSL connections.
KAFKA_SASL_MECHANISM
- No
- SASL mechanism to use for authentication. Supported values are
'PLAIN'
,'SCRAM-SHA-256'
,'SCRAM-SHA-512'
. Default values is'PLAIN'
.
The following example defines a Data Source with a new Kafka, Confluent, or RedPanda connection in a .datasource file:
Data Source with a new Kafka/Confluent/RedPanda connection
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME my_connection_name KAFKA_BOOTSTRAP_SERVERS my_server:9092 KAFKA_KEY my_username KAFKA_SECRET my_password KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
The following example defines a Data Source that uses an existing Kafka, Confluent, or RedPanda connection:
Data Source with an existing Kafka/Confluent/RedPanda connection
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME my_connection_name KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
Refer to the Kafka Connector, Amazon MSK, Confluent Connector, or RedPanda Connector documentation for more details.
BigQuery¶
The BigQuery connector uses the following settings:
Instruction | Required | Description |
---|---|---|
IMPORT_SERVICE | Yes | Name of the import service to use. Use bigquery . |
IMPORT_SCHEDULE | Yes | Cron expression, in UTC time, with the frequency to run imports. Must be higher than 5 minutes. For example, */5 * * * * . Use @auto to sync once per minute when using s3 , or @on-demand to only run manually. |
IMPORT_CONNECTION_NAME | Yes | Name given to the connection inside Tinybird. For example, 'my_connection' . |
IMPORT_STRATEGY | Yes | Strategy to use when inserting data. Use REPLACE for BigQuery. |
IMPORT_EXTERNAL_DATASOURCE | No | Fully qualified name of the source table in BigQuery. For example, project.dataset.table . |
IMPORT_QUERY | No | The SELECT query to retrieve your data from BigQuery when you don't need all the columns or want to make a transformation before ingest. The FROM clause must reference a table using the full scope. For example, project.dataset.table . |
See BigQuery Connector for more details.
BigQuery example¶
The following example shows a BigQuery Data Source described in a .datasource file:
Data Source with a BigQuery connection
DESCRIPTION > bigquery demo data source SCHEMA > `timestamp` DateTime `json:$.timestamp`, `id` Integer `json:$.id`, `orderid` LowCardinality(String) `json:$.orderid`, `status` LowCardinality(String) `json:$.status`, `amount` Integer `json:$.amount` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" IMPORT_SERVICE bigquery IMPORT_SCHEDULE */5 * * * * IMPORT_EXTERNAL_DATASOURCE mydb.raw.events IMPORT_STRATEGY REPLACE IMPORT_QUERY > select timestamp, id, orderid, status, amount from mydb.raw.events
S3¶
The S3 connector uses the following settings:
Instruction | Required | Description |
---|---|---|
IMPORT_SERVICE | Yes | Name of the import service to use. Use s3 for S3 connections. |
IMPORT_CONNECTION_NAME | Yes | Name given to the connection inside Tinybird. For example, 'my_connection' . |
IMPORT_STRATEGY | Yes | Strategy to use when inserting data. Use APPEND for S3 connections. |
IMPORT_BUCKET_URI | Yes | Full bucket path, including the s3:// protocol, bucket name, object path, and an optional pattern to match against object keys. For example, s3://my-bucket/my-path discovers all files in the bucket my-bucket under the prefix /my-path . You can use patterns in the path to filter objects, for example, ending the path with *.csv matches all objects that end with the .csv suffix. |
IMPORT_FROM_DATETIME | No | Sets the date and time from which to start ingesting files on an S3 bucket. The format is YYYY-MM-DDTHH:MM:SSZ . |
See S3 Connector for more details.
S3 example¶
The following example shows an S3 Data Source described in a .datasource file:
tinybird/datasources/s3.datasource - Data Source with an S3 connection
DESCRIPTION > Analytics events landing data source SCHEMA > `timestamp` DateTime `json:$.timestamp`, `session_id` String `json:$.session_id`, `action` LowCardinality(String) `json:$.action`, `version` LowCardinality(String) `json:$.version`, `payload` String `json:$.payload` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" IMPORT_SERVICE s3 IMPORT_CONNECTION_NAME connection_name IMPORT_BUCKET_URI s3://my-bucket/*.csv IMPORT_SCHEDULE @auto IMPORT_STRATEGY APPEND
/%}