Datafiles¶
Datafiles are plain-text files that describe your Tinybird resources: Data Sources, Pipes, etc.
Datafiles allow you to manage your Data Project as source code, version control and collaborate through Git and use the CLI to manage your Workspaces or automate processes like CI/CD pipelines.
Basic syntax¶
Datafiles follow a really simple syntax:
CMD value
OTHER_CMD "value with multiple words"
or
CMD >
multi
line
values
are indented
A simple example:
DESCRIPTION generated from /Users/username/tmp/sample.csv
SCHEMA >
`d` DateTime,
`total` Int32,
`from_novoa` Int16
DESCRIPTION generated from /Users/username/tmp/sample.ndjson
SCHEMA >
`d` DateTime `json:$.d`,
`total` Int32 `json:$.total`,
`from_novoa` Int16 `json:$.from_novoa`
Read more about JSONPaths.
Types of Datafiles¶
.datasource
represents a Data Source and should follow the Data Source syntax.pipe
represents a Pipe and should follow the Pipe syntax.incl
Include Datafiles are a reusable Datafile part that can be included either in.datasource
or.pipe
files. It might include connection credentials, nodes, etc.
Find below the complete Datafile commands reference and some usage examples.
Common reference¶
VERSION <integer_number>
- Defines the version for the resources, can be used both for Data Sources and Pipes. Use it at the top of the Datafile.
VERSION 1
SCHEMA >
`d` DateTime `json:$.d`,
`total` Int32 `json:$.total`
Learn how to version resources in this guide.
Data Source¶
SCHEMA <schema_definition>
- (Required) It defines a block for a Data Source schema, only valid for .datasource files. The block has to be indented.DESCRIPTION <markdown_string>
- (Optional) Sets the description for Data SourceTOKEN <token_name> APPEND
- (Optional) Grants append access to a Data Source to the token with name <token_name>. If it does not exist it’ll be automatically created.ENGINE <engine_type>
- (Optional) Sets the ClickHouse Engine for Data Source. Default:MergeTree
.ENGINE_SORTING_KEY <sql>
- (Optional) Sets the ClickHouse ORDER BY expression for Data Source. If not set it will default to a DateTime, numeric or String columns in that order.ENGINE_PARTITION_KEY <sql>
- (Optional) Sets the ClickHouse PARTITION expression for Data Source.ENGINE_TTL <sql>
- (Optional) Sets the ClickHouse TTL expression for Data Source.ENGINE_VER <column_name>
- (Optional) Required whenENGINE ReplacingMergeTree
. The column with the version of the object state.ENGINE_SIGN <column_name>
- (Optional) Required whenENGINE CollapsingMergeTree
orENGINE VersionedCollapsingMergeTree
. The column to compute the state.ENGINE_VERSION <column_name>
- (Optional) Required whenENGINE VersionedCollapsingMergeTree
. The column with the version of the object state.
VERSION 0
TOKEN tracker APPEND
DESCRIPTION >
Analytics events **landing data source**
SCHEMA >
`timestamp` DateTime `json:$.timestamp`,
`session_id` String `json:$.session_id`,
`action` LowCardinality(String) `json:$.action`,
`version` LowCardinality(String) `json:$.version`,
`payload` String `json:$.payload`
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"
SCHEMA¶
A SCHEMA
definition is a new line comma separated list of columns definitions.
Each column is represented as `column_name` data_type json_path default_value
, where column_name
is the name of the column in the Data Source and data_type
is one of the supported Data Types.
json_path
is optional and only required for NDJSON Data Sources, read more about JSONpaths.
default_value
is used to set a default value to the column when it’s null. It’s not supported on NDJSON Data Sources. A common use case is to set a default date to a column like this updated_at DateTime DEFAULT now()
.
ENGINE and settings¶
Engine can be one of: MergeTree
, ReplacingMergeTree
, SummingMergeTree
, AggregatingMergeTree
, CollapsingMergeTree
, VersionedCollapsingMergeTree
, Null
.
Read the supported engine and settings documentation for more info.
Connectors¶
You describe connectors as part of the .datasource
Datafile. You can use `Include`
to better handle connection settings and credentials.
Kafka and Confluent¶
KAFKA_CONNECTION_NAME
: (Required) The name of the configured Kafka connection in TinybirdKAFKA_BOOTSTRAP_SERVERS
: (Required) A comma-separated list of one or more Kafka brokers (including Port numbers)KAFKA_KEY
: (Required) The key used to authenticate with Kafka, sometimes called Key, Client Key, or Username, depending on the Kafka distributionKAFKA_SECRET
: (Required) The secret used to authenticate with Kafka, sometimes called Secret, Secret Key, or Password, depending on the Kafka distributionKAFKA_TOPIC
: (Required) The name of the Kafka topic to consume fromKAFKA_GROUP_ID
: (Required) The Kafka Consumer Group ID to use when consuming from KafkaKAFKA_AUTO_OFFSET_RESET
: (Optional) The offset to use when no previous offset can be found, e.g. when creating a new consumer. Supported values:latest
,earliest
. Default:latest
.KAFKA_STORE_RAW_VALUE
: (Optional) Stores the raw message in its entirety as an additional column. Supported values:'True'
,'False'
. Default:'False'
KAFKA_SCHEMA_REGISTRY_URL
: (Optional) URL of the Kafka schema registryKAFKA_TARGET_PARTITIONS
: (Optional)KAFKA_STORE_HEADERS
: (Optional)KAFKA_KEY_AVRO_DESERIALIZATION
: (Optional)
For example, to define Data Source with a new Kafka connection in a .datasource
file:
SCHEMA >
`value` String,
`topic` LowCardinality(String),
`partition` Int16,
`offset` Int64,
`timestamp` DateTime,
`key` String
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id
Or, to define Data Source that uses an exsting Kafka connection:
SCHEMA >
`value` String,
`topic` LowCardinality(String),
`partition` Int16,
`offset` Int64,
`timestamp` DateTime,
`key` String
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id
Please refer to the Kafka and Confluent connectors reference for more details about how to configure them.
Scheduled Connectors¶
IMPORT_SERVICE
: (Required) name of the import service to use, valid values:bigquery
,snowflake
,s3
IMPORT_SCHEDULE
: (Required) a cron expression with the frequency to run imports, must be higher than 5 minutes, e.g.*/5 * * * *
. Use@auto
to sync once per minute (only available fors3
), or@on-demand
to only execute manuallyIMPORT_CONNECTION_NAME
: (Required) the name given to the connection inside Tinybird, e.g.'my_connection'
IMPORT_STRATEGY
: (Required) the strategy to use when inserting data, eitherREPLACE
(for BigQuery and Snowflake) orAPPEND
(for s3).IMPORT_BUCKET_URI
: (Required) whenIMPORT_SERVICE s3
. A full bucket path, including thes3://
protocol , bucket name, object path and an optional pattern to match against object keys. For example,s3://my-bucket/my-path
would discover all files in the bucketmy-bucket
under the prefix/my-path
. You can use patterns in the path to filter objects, for example, ending the path with*.csv
will match all objects that end with the.csv
suffix.IMPORT_EXTERNAL_DATASOURCE
: (Optional) the fully qualified name of the source table in BigQuery or Snowflake e.g.project.dataset.table
IMPORT_QUERY
: (Optional) the SELECT query to extract your data from BigQuery or Snowflake when you don’t need all the columns or want to make a transformation before ingestion. The FROM must reference a table using the full scope:project.dataset.table
Please refer to the BigQuery, Snowflake and S3 connectors reference for more details about how to configure them.
BigQuery¶
DESCRIPTION >
bigquery demo data source
SCHEMA >
`timestamp` DateTime `json:$.timestamp`,
`id` Integer `json:$.id`,
`orderid` LowCardinality(String) `json:$.orderid`,
`status` LowCardinality(String) `json:$.status`,
`amount` Integer `json:$.amount`
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"
IMPORT_SERVICE bigquery
IMPORT_SCHEDULE */5 * * * *
IMPORT_EXTERNAL_DATASOURCE mydb.raw.events
IMPORT_STRATEGY REPLACE
IMPORT_QUERY >
select
timestamp,
id,
orderid,
status,
amount
from
mydb.raw.events
Snowflake¶
DESCRIPTION >
Snowflake demo data source
SCHEMA >
`timestamp` DateTime `json:$.timestamp`,
`id` Integer `json:$.id`,
`orderid` LowCardinality(String) `json:$.orderid`,
`status` LowCardinality(String) `json:$.status`,
`amount` Integer `json:$.amount`
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"
IMPORT_SERVICE snowflake
IMPORT_CONNECTION_NAME my_snowflake_connection
IMPORT_EXTERNAL_DATASOURCE mydb.raw.events
IMPORT_SCHEDULE */5 * * * *
IMPORT_STRATEGY REPLACE
IMPORT_QUERY >
select
timestamp,
id,
orderid,
status,
amount
from
mydb.raw.events
S3¶
DESCRIPTION >
Analytics events landing data source
SCHEMA >
`timestamp` DateTime `json:$.timestamp`,
`session_id` String `json:$.session_id`,
`action` LowCardinality(String) `json:$.action`,
`version` LowCardinality(String) `json:$.version`,
`payload` String `json:$.payload`
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"
IMPORT_SERVICE s3
IMPORT_CONNECTION_NAME connection_name
IMPORT_BUCKET_URI s3://my-bucket/*.csv
IMPORT_SCHEDULE @auto
IMPORT_STRATEGY APPEND
Pipe¶
%
- (Optional) Use as the first characted of a node to indicate the node uses the templating systemDESCRIPTION <markdown_string>
- (Optional) Sets the description for a node or the complete fileNODE <node_name>
- (Required) Starts the definition of a new node, all the commands until a new NODE command or the end of the file will be related to the this nodeSQL <sql>
- (Required) It defines a block for the SQL of a node. The block has to be indented (refer to the examples above).INCLUDE <include_path.incl> <variables>
- (Optional) Includes are pieces of a pipe that can be reused in multiple pipe datafies.TYPE <pipe_type>
- (Optional) Sets the type of the node. Can be set to ‘MATERIALIZED’ or ‘COPY’DATASOURCE <data_source_name>
- (Required) whenTYPE MATERIALIZED
Sets the destination Data Source for materialized nodesTARGET_DATASOURCE <data_source_name>
- (Required) whenTYPE COPY
. Sets the destination Data Source for copy nodes.TOKEN <token_name> READ
- (Optional) Grants read access to a Pipe/Endpoint to the token with name <token_name>. If it does not exist it’ll be automatically created.COPY_SCHEDULE
- (Optional) a cron expression with the frequency to run copy jobs, must be higher than 5 minutes, e.g.*/5 * * * *
. If not defined, it would default to @on-demand.
Materialized Pipe¶
Use it to define how to materialize each row ingested in the left most Data Source in the Pipe query to a materialized Data Source. Materialization happens on ingestion. More about Materialized Views.
DESCRIPTION materialized pipe to aggregate sales per hour in the sales_by_hour Data Source
NODE daily_sales
SQL >
SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales
FROM teams
GROUP BY day, country
TYPE MATERIALIZED
DATASOURCE sales_by_hour
Copy Pipe¶
Use it to define how to export the result of the Pipe to a Data Source, optionally with a schedule. More about Copy Pipes.
DESCRIPTION copy pipe to export sales hour every hour to the sales_hour_copy Data Source
NODE daily_sales
SQL >
%
SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales
FROM teams
WHERE
day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now())
and country = {{ String(country, ‘US’)}}
GROUP BY day, country
TYPE COPY
COPY_SCHEDULE 0 * * * *
TARGET_DATASOURCE sales_hour_copy
API Endpoint Pipe¶
Use it to define how to export the result of the Pipe as an HTTP endpoint. More about API Endpoints
TOKEN dashboard READ
DESCRIPTION endpoint to get sales by hour filtering by date and country
NODE daily_sales
SQL >
%
SELECT day, country, sum(total_sales) as total_sales
FROM sales_by_hour
WHERE
day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now())
and country = {{ String(country, ‘US’)}}
GROUP BY day, country
NODE result
SQL >
%
SELECT * FROM daily_sales
LIMIT {{Int32(page_size, 100)}}
OFFSET {{Int32(page, 0) * Int32(page_size, 100)}}
Include¶
Use .incl
Datafiles to separate connector settings and include them in your .datasource
files or reuse Pipe templates. See some examples below.
Include connector settings¶
Separate connector settings from .datasource
files.
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password
SCHEMA >
`value` String,
`topic` LowCardinality(String),
`partition` Int16,
`offset` Int64,
`timestamp` DateTime,
`key` String
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
INCLUDE "connections/kafka_connection.incl"
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id
Include Pipe nodes¶
You can also use .incl
Datafiles to reuse templates
NODE only_buy_events
SQL >
SELECT
toDate(timestamp) date,
product,
color,
JSONExtractFloat(json, 'price') as price
FROM events
where action = 'buy'
INCLUDE "./includes/only_buy_events.incl"
NODE endpoint
DESCRIPTION >
return sales for a product with color filter
SQL >
%
select date, sum(price) total_sales
from only_buy_events
where color in {{Array(colors, 'black')}}
group by date
INCLUDE "./includes/only_buy_events.incl"
NODE top_per_day
SQL >
SELECT date,
topKState(10)(product) top_10,
sumState(price) total_sales
from only_buy_events
group by date
TYPE MATERIALIZED
DATASOURCE mv_top_per_day
Include with variables¶
You can also templatize .incl
Datafiles. For instance you can reuse the same .incl
template but with different variables values like this:
NODE endpoint
DESCRIPTION >
returns top 10 products for the last week
SQL >
select
date,
topKMerge(10)(top_10) as top_10
from top_product_per_day
{% if '$DATE_FILTER' = 'last_week' %}
where date > today() - interval 7 day
{% else %}
where date between {{Date(start)}} and {{Date(end)}}
{% end %}
group by date
INCLUDE "./includes/top_products.incl" "DATE_FILTER=last_week"
INCLUDE "./includes/top_products.incl" "DATE_FILTER=between_dates"
Note we are including $DATE_FILTER
as a variable in the .incl
Datafile and then we create two separate endpoints injecting a value for the DATE_FILTER
variable making the final .pipe
Datafile being different.
Include with Environment variables¶
INCLUDE
Datafiles are expanded by the CLI, that means you can expand Environment variables as well.
INCLUDE "./includes/${ENV}_helper.incl"
NODE endpoint
SQL >
SELECT *
FROM auxiliar_node
NODE auxiliar_node
SQL >
SELECT 1 as value
ENV=stg tb push pipes/endpoint_pipe.pipe