• Instruction
  • Required
  • Description

  • KAFKA_CONNECTION_NAME
  • Yes
  • The name of the configured Kafka connection in Tinybird.

  • KAFKA_BOOTSTRAP_SERVERS
  • Yes
  • Comma-separated list of one or more Kafka brokers, including Port numbers.

  • KAFKA_KEY
  • Yes
  • Key used to authenticate with Kafka. Sometimes called Key, Client Key, or Username, depending on the Kafka distribution.

  • KAFKA_SECRET
  • Yes
  • Secret used to authenticate with Kafka. Sometimes called Secret, Secret Key, or Password, depending on the Kafka distribution.

  • KAFKA_TOPIC
  • Yes
  • Name of the Kafka topic to consume from.

  • KAFKA_GROUP_ID
  • Yes
  • Consumer Group ID to use when consuming from Kafka.

  • KAFKA_AUTO_OFFSET_RESET
  • No
  • Offset to use when no previous offset can be found, for example when creating a new consumer. Supported values are latest, earliest. Default: latest.

  • KAFKA_STORE_HEADERS
  • No
  • Store Kafka headers as field __headers for later processing. Default value is 'False'.

  • KAFKA_STORE_BINARY_HEADERS
  • No
  • Stores all Kafka headers as binary data in field __headers as a binary map of type Map(String, String). To access the header 'key' run: __headers['key']. Default value is 'True'. This field only applies if KAFKA_STORE_HEADERS is set to True.

  • KAFKA_STORE_RAW_VALUE
  • No
  • Stores the raw message in its entirety as an additional column. Supported values are 'True', 'False'. Default: 'False'.

  • KAFKA_SCHEMA_REGISTRY_URL
  • No
  • URL of the Kafka schema registry.

  • KAFKA_TARGET_PARTITIONS
  • No
  • Target partitions to place the messages.

  • KAFKA_KEY_AVRO_DESERIALIZATION
  • No
  • Key for decoding Avro messages.

  • KAFKA_SSL_CA_PEM
  • No
  • CA certificate in PEM format for SSL connections.

  • KAFKA_SASL_MECHANISM
  • No
  • SASL mechanism to use for authentication. Supported values are 'PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'. Default values is 'PLAIN'.

The following example defines a Data Source with a new Kafka, Confluent, or RedPanda connection in a .datasource file:

Data Source with a new Kafka/Confluent/RedPanda connection
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

The following example defines a Data Source that uses an existing Kafka, Confluent, or RedPanda connection:

Data Source with an existing Kafka/Confluent/RedPanda connection
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

KAFKA_CONNECTION_NAME my_connection_name
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

Refer to the Kafka Connector, Amazon MSK, Confluent Connector, or RedPanda Connector documentation for more details.

BigQuery

The BigQuery connector uses the following settings:

InstructionRequiredDescription
IMPORT_SERVICEYesName of the import service to use. Use bigquery.
IMPORT_SCHEDULEYesCron expression, in UTC time, with the frequency to run imports. Must be higher than 5 minutes. For example, */5 * * * *. Use @auto to sync once per minute when using s3, or @on-demand to only run manually.
IMPORT_CONNECTION_NAMEYesName given to the connection inside Tinybird. For example, 'my_connection'.
IMPORT_STRATEGYYesStrategy to use when inserting data. Use REPLACE for BigQuery.
IMPORT_EXTERNAL_DATASOURCENoFully qualified name of the source table in BigQuery. For example, project.dataset.table.
IMPORT_QUERYNoThe SELECT query to retrieve your data from BigQuery when you don't need all the columns or want to make a transformation before ingest. The FROM clause must reference a table using the full scope. For example, project.dataset.table.

See BigQuery Connector for more details.

BigQuery example

The following example shows a BigQuery Data Source described in a .datasource file:

Data Source with a BigQuery connection
DESCRIPTION >
    bigquery demo data source

SCHEMA >
    `timestamp` DateTime `json:$.timestamp`,
    `id` Integer `json:$.id`,
    `orderid` LowCardinality(String) `json:$.orderid`,
    `status` LowCardinality(String) `json:$.status`,
    `amount` Integer `json:$.amount`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"

IMPORT_SERVICE bigquery
IMPORT_SCHEDULE */5 * * * *
IMPORT_EXTERNAL_DATASOURCE mydb.raw.events
IMPORT_STRATEGY REPLACE
IMPORT_QUERY >
    select
    timestamp,
    id,
    orderid,
    status,
    amount
        from
        mydb.raw.events

S3

The S3 connector uses the following settings:

InstructionRequiredDescription
IMPORT_SERVICEYesName of the import service to use. Use s3 for S3 connections.
IMPORT_CONNECTION_NAMEYesName given to the connection inside Tinybird. For example, 'my_connection'.
IMPORT_STRATEGYYesStrategy to use when inserting data. Use APPEND for S3 connections.
IMPORT_BUCKET_URIYesFull bucket path, including the s3:// protocol, bucket name, object path, and an optional pattern to match against object keys. For example, s3://my-bucket/my-path discovers all files in the bucket my-bucket under the prefix /my-path. You can use patterns in the path to filter objects, for example, ending the path with *.csv matches all objects that end with the .csv suffix.
IMPORT_FROM_DATETIMENoSets the date and time from which to start ingesting files on an S3 bucket. The format is YYYY-MM-DDTHH:MM:SSZ.

See S3 Connector for more details.

S3 example

The following example shows an S3 Data Source described in a .datasource file:

tinybird/datasources/s3.datasource - Data Source with an S3 connection
DESCRIPTION >
    Analytics events landing data source

SCHEMA >
    `timestamp` DateTime `json:$.timestamp`,
    `session_id` String `json:$.session_id`,
    `action` LowCardinality(String) `json:$.action`,
    `version` LowCardinality(String) `json:$.version`,
    `payload` String `json:$.payload`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"

IMPORT_SERVICE s3
IMPORT_CONNECTION_NAME connection_name
IMPORT_BUCKET_URI s3://my-bucket/*.csv
IMPORT_SCHEDULE @auto
IMPORT_STRATEGY APPEND

/%}

Updated