Datafiles

Datafiles are text files that describe your Tinybird resources: Data Sources, Pipes, etc.

You can use Datafiles to manage your projects as source code and take advantage of version control.

Types of Datafiles

  • Data Source: uses the .datasource extension. Represents a Data Source and should follow the Data Source syntax.
  • Pipe: uses the .pipe extension. Represents a Pipe and should follow the Pipe syntax
  • Include: uses the .incl extension. Includes are a reusable Datafile fragment that can be included either in .datasource or .pipe files.

Basic syntax

Datafiles follow a really simple syntax:

Basic syntax
CMD value
OTHER_CMD "value with multiple words"

or

Multiline syntax
CMD >
    multi
    line
    values
    are indented

A simple example:

Schema syntax
DESCRIPTION generated from /Users/username/tmp/sample.csv

SCHEMA >
    `d` DateTime,
    `total` Int32,
    `from_novoa` Int16
Schema syntax with jsonpath
DESCRIPTION generated from /Users/username/tmp/sample.ndjson

SCHEMA >
    `d` DateTime `json:$.d`,
    `total` Int32 `json:$.total`,
    `from_novoa` Int16 `json:$.from_novoa`

Read more about JSONPaths.

Data Source

  • SCHEMA <schema_definition> - (Required) It defines a block for a Data Source schema, only valid for .datasource files. The block has to be indented.
  • DESCRIPTION <markdown_string> - (Optional) Sets the description for Data Source
  • TOKEN <token_name> APPEND - (Optional) Grants append access to a Data Source to the token with name <token_name>. If it does not exist it'll be automatically created.
  • ENGINE <engine_type> - (Optional) Sets the ClickHouse Engine for Data Source. Default: MergeTree.
  • ENGINE_SORTING_KEY <sql> - (Optional) Sets the ClickHouse ORDER BY expression for Data Source. If not set it will default to a DateTime, numeric or String columns in that order.
  • ENGINE_PARTITION_KEY <sql> - (Optional) Sets the ClickHouse PARTITION expression for Data Source.
  • ENGINE_TTL <sql> - (Optional) Sets the ClickHouse TTL expression for Data Source.
  • ENGINE_VER <column_name> - (Optional) Required when ENGINE ReplacingMergeTree. The column with the version of the object state.
  • ENGINE_SIGN <column_name> - (Optional) Required when ENGINE CollapsingMergeTree or ENGINE VersionedCollapsingMergeTree. The column to compute the state.
  • ENGINE_VERSION <column_name> - (Optional) Required when ENGINE VersionedCollapsingMergeTree. The column with the version of the object state.
datasources/example.datasource
TOKEN tracker APPEND

DESCRIPTION >
    Analytics events **landing data source**

SCHEMA >
    `timestamp` DateTime `json:$.timestamp`,
    `session_id` String `json:$.session_id`,
    `action` LowCardinality(String) `json:$.action`,
    `version` LowCardinality(String) `json:$.version`,
    `payload` String `json:$.payload`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"

SCHEMA

A SCHEMA definition is a new line comma separated list of columns definitions.

Each column is represented as `column_name` data_type json_path default_value, where column_name is the name of the column in the Data Source and data_type is one of the supported Data Types.

json_path is optional and only required for NDJSON Data Sources, read more about JSONpaths.

default_value is used to set a default value to the column when it's null. It's not supported on NDJSON Data Sources. A common use case is to set a default date to a column like this updated_at DateTime DEFAULT now().

ENGINE and settings

Engine can be one of: MergeTree, ReplacingMergeTree, SummingMergeTree, AggregatingMergeTree, CollapsingMergeTree, VersionedCollapsingMergeTree, Null.

Read the supported engine and settings documentation for more info.

Connectors

You describe connectors as part of the .datasource Datafile. You can use Include to better handle connection settings and credentials.

Kafka and Confluent

  • KAFKA_CONNECTION_NAME: (Required) The name of the configured Kafka connection in Tinybird
  • KAFKA_BOOTSTRAP_SERVERS: (Required) A comma-separated list of one or more Kafka brokers (including Port numbers)
  • KAFKA_KEY: (Required) The key used to authenticate with Kafka, sometimes called Key, Client Key, or Username, depending on the Kafka distribution
  • KAFKA_SECRET: (Required) The secret used to authenticate with Kafka, sometimes called Secret, Secret Key, or Password, depending on the Kafka distribution
  • KAFKA_TOPIC: (Required) The name of the Kafka topic to consume from
  • KAFKA_GROUP_ID: (Required) The Kafka Consumer Group ID to use when consuming from Kafka
  • KAFKA_AUTO_OFFSET_RESET: (Optional) The offset to use when no previous offset can be found, e.g. when creating a new consumer. Supported values: latest, earliest. Default: latest.
  • KAFKA_STORE_RAW_VALUE: (Optional) Stores the raw message in its entirety as an additional column. Supported values: 'True', 'False'. Default: 'False'
  • KAFKA_SCHEMA_REGISTRY_URL: (Optional) URL of the Kafka schema registry
  • KAFKA_TARGET_PARTITIONS: (Optional)
  • KAFKA_STORE_HEADERS: (Optional)
  • KAFKA_KEY_AVRO_DESERIALIZATION: (Optional)

For example, to define Data Source with a new Kafka connection in a .datasource file:

datasources/kafka.datasource - Data Source with a new Kafka connection
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

Or, to define Data Source that uses an existing Kafka connection:

datasources/kafka.datasource - Data Source with an existing Kafka connection
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

KAFKA_CONNECTION_NAME my_connection_name
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

Refer to the Kafka and Confluent connectors reference for more details about how to configure them.

Scheduled Connectors

  • IMPORT_SERVICE: (Required) name of the import service to use, valid values: bigquery, snowflake, s3
  • IMPORT_SCHEDULE: (Required) a cron expression (UTC) with the frequency to run imports, must be higher than 5 minutes, e.g. */5 * * * *. Use @auto to sync once per minute (only available for s3), or @on-demand to only execute manually
  • IMPORT_CONNECTION_NAME: (Required) the name given to the connection inside Tinybird, e.g. 'my_connection'
  • IMPORT_STRATEGY: (Required) the strategy to use when inserting data, either REPLACE (for BigQuery and Snowflake) or APPEND (for s3).
  • IMPORT_BUCKET_URI: (Required) when IMPORT_SERVICE s3. A full bucket path, including the s3:// protocol , bucket name, object path and an optional pattern to match against object keys. For example, s3://my-bucket/my-path would discover all files in the bucket my-bucket under the prefix /my-path. You can use patterns in the path to filter objects, for example, ending the path with *.csv will match all objects that end with the .csv suffix.
  • IMPORT_EXTERNAL_DATASOURCE: (Optional) the fully qualified name of the source table in BigQuery or Snowflake e.g. project.dataset.table
  • IMPORT_QUERY: (Optional) the SELECT query to extract your data from BigQuery or Snowflake when you don't need all the columns or want to make a transformation before ingestion. The FROM must reference a table using the full scope: project.dataset.table

Refer to the BigQuery, Snowflake and S3 connectors reference for more details about how to configure them.

BigQuery

datasources/bigquery.datasource - Data Source with a BigQuery connection
DESCRIPTION >
    bigquery demo data source

SCHEMA >
    `timestamp` DateTime `json:$.timestamp`,
    `id` Integer `json:$.id`,
    `orderid` LowCardinality(String) `json:$.orderid`,
    `status` LowCardinality(String) `json:$.status`,
    `amount` Integer `json:$.amount`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"

IMPORT_SERVICE bigquery
IMPORT_SCHEDULE */5 * * * *
IMPORT_EXTERNAL_DATASOURCE mydb.raw.events
IMPORT_STRATEGY REPLACE
IMPORT_QUERY >
    select
    timestamp,
    id,
    orderid,
    status,
    amount
        from
        mydb.raw.events

Snowflake

datasources/snowflake.datasource - Data Source with an Snowflake connection
DESCRIPTION >
    Snowflake demo data source

SCHEMA >
    `timestamp` DateTime `json:$.timestamp`,
    `id` Integer `json:$.id`,
    `orderid` LowCardinality(String) `json:$.orderid`,
    `status` LowCardinality(String) `json:$.status`,
    `amount` Integer `json:$.amount`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"

IMPORT_SERVICE snowflake
IMPORT_CONNECTION_NAME my_snowflake_connection
IMPORT_EXTERNAL_DATASOURCE mydb.raw.events
IMPORT_SCHEDULE */5 * * * *
IMPORT_STRATEGY REPLACE
IMPORT_QUERY >
    select
    timestamp,
    id,
    orderid,
    status,
    amount
        from
        mydb.raw.events

S3

datasources/s3.datasource - Data Source with an S3 connection
DESCRIPTION >
    Analytics events landing data source

SCHEMA >
    `timestamp` DateTime `json:$.timestamp`,
    `session_id` String `json:$.session_id`,
    `action` LowCardinality(String) `json:$.action`,
    `version` LowCardinality(String) `json:$.version`,
    `payload` String `json:$.payload`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"

IMPORT_SERVICE s3
IMPORT_CONNECTION_NAME connection_name
IMPORT_BUCKET_URI s3://my-bucket/*.csv
IMPORT_SCHEDULE @auto
IMPORT_STRATEGY APPEND

Pipe

  • % - (Optional) Use as the first characted of a node to indicate the node uses the templating system
  • DESCRIPTION <markdown_string> - (Optional) Sets the description for a node or the complete file
  • NODE <node_name> - (Required) Starts the definition of a new node, all the commands until a new NODE command or the end of the file will be related to the this node
  • SQL <sql> - (Required) It defines a block for the SQL of a node. The block has to be indented (refer to the examples above).
  • INCLUDE <include_path.incl> <variables> - (Optional) Includes are pieces of a pipe that can be reused in multiple pipe datafies.
  • TYPE <pipe_type> - (Optional) Sets the type of the node. Can be set to 'MATERIALIZED' or 'COPY'
  • DATASOURCE <data_source_name> - (Required) when TYPE MATERIALIZED Sets the destination Data Source for materialized nodes
  • TARGET_DATASOURCE <data_source_name> - (Required) when TYPE COPY. Sets the destination Data Source for copy nodes.
  • TOKEN <token_name> READ - (Optional) Grants read access to a Pipe/Endpoint to the token with name <token_name>. If it does not exist it'll be automatically created.
  • COPY_SCHEDULE - (Optional) a cron expression with the frequency to run copy jobs, must be higher than 5 minutes, e.g. */5 * * * *. If not defined, it would default to @on-demand.

Materialized Pipe

Use it to define how to materialize each row ingested in the left most Data Source in the Pipe query to a materialized Data Source. Materialization happens on ingestion. More about Materialized Views.

pipes/sales_by_hour_mv.pipe
DESCRIPTION materialized pipe to aggregate sales per hour in the sales_by_hour Data Source

NODE daily_sales
SQL >
    SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales
    FROM teams
    GROUP BY day, country

TYPE MATERIALIZED
DATASOURCE sales_by_hour

Copy Pipe

Use it to define how to export the result of the Pipe to a Data Source, optionally with a schedule. More about Copy Pipes.

pipes/sales_by_hour_cp.pipe
DESCRIPTION copy pipe to export sales hour every hour to the sales_hour_copy Data Source

NODE daily_sales
SQL >
    %
    SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales
    FROM teams
    WHERE
    day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now())
    and country = {{ String(country, ‘US’)}}
    GROUP BY day, country

TYPE COPY
TARGET_DATASOURCE sales_hour_copy
COPY_SCHEDULE 0 * * * *

API Endpoint Pipe

Use it to define how to export the result of the Pipe as an HTTP endpoint. More about API Endpoints

pipes/sales_by_hour_endpoint.pipe
TOKEN dashboard READ
DESCRIPTION endpoint to get sales by hour filtering by date and country

NODE daily_sales
SQL >
    %
    SELECT day, country, sum(total_sales) as total_sales
    FROM sales_by_hour
    WHERE
    day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now())
    and country = {{ String(country, ‘US’)}}
    GROUP BY day, country

NODE result
SQL >   
    %
    SELECT * FROM daily_sales
    LIMIT {{Int32(page_size, 100)}}
    OFFSET {{Int32(page, 0) * Int32(page_size, 100)}}

Include

Use .incl Datafiles to separate connector settings and include them in your .datasource files or reuse Pipe templates. See some examples below.

Include connector settings

Separate connector settings from .datasource files.

connections/kafka_connection.incl
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password
datasources/kafka_ds.datasource
SCHEMA >
`value` String,
`topic` LowCardinality(String),
`partition` Int16,
`offset` Int64,
`timestamp` DateTime,
`key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

INCLUDE "connections/kafka_connection.incl"

KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

Include Pipe nodes

You can also use .incl Datafiles to reuse templates:

includes/only_buy_events.incl
NODE only_buy_events
SQL >
    SELECT
        toDate(timestamp) date,
        product,
        color,
        JSONExtractFloat(json, 'price') as price
    FROM events
    where action = 'buy'
endpoints/sales.pipe
INCLUDE "./includes/only_buy_events.incl"

NODE endpoint
DESCRIPTION >
    return sales for a product with color filter
SQL >
    %
    select date, sum(price) total_sales
    from only_buy_events
    where color in {{Array(colors, 'black')}}
    group by date
pipes/top_per_day.pipe
INCLUDE "./includes/only_buy_events.incl"

NODE top_per_day
SQL >
SELECT date,
        topKState(10)(product) top_10,
        sumState(price) total_sales
from only_buy_events
group by date

TYPE MATERIALIZED
DATASOURCE mv_top_per_day

Include with variables

You can also templatize .incl Datafiles. For instance you can reuse the same .incl template but with different variable values:

includes/top_products.incl
NODE endpoint
DESCRIPTION >
    returns top 10 products for the last week
SQL >
    %
    select
        date,
        topKMerge(10)(top_10) as top_10
    from top_product_per_day

    {% if '$DATE_FILTER' == 'last_week' %}
        where date > today() - interval 7 day
    {% else %}
        where date between {{Date(start)}} and {{Date(end)}}
    {% end %}

    group by date
endpoints/top_products_last_week.pipe
INCLUDE "./includes/top_products.incl" "DATE_FILTER=last_week"
endpoints/top_products_between_dates.pipe
INCLUDE "./includes/top_products.incl" "DATE_FILTER=between_dates"

Note we are including $DATE_FILTER as a variable in the .incl Datafile and then we create two separate endpoints injecting a value for the DATE_FILTER variable making the final .pipe Datafile being different.

Include with Environment variables

INCLUDE Datafiles are expanded by the CLI, that means you can expand Environment variables as well.

Assuming you have configured the environment variables, KAFKA_BOOTSTRAP_SERVERS, KAFKA_KEY and KAFKA_SECRET you can create a .incl Datafile like this:

connections/kafka_connection.incl
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS ${KAFKA_BOOTSTRAP_SERVERS}
KAFKA_KEY ${KAFKA_KEY}
KAFKA_SECRET ${KAFKA_SECRET}

And use these values in your .datasource Datafiles:

datasources/kafka_ds.datasource
SCHEMA >
`value` String,
`topic` LowCardinality(String),
`partition` Int16,
`offset` Int64,
`timestamp` DateTime,
`key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

INCLUDE "connections/kafka_connection.incl"

KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

Next steps